[jira] [Commented] (SPARK-20568) Delete files after processing in structured streaming

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-20568?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16717089#comment-16717089
 ] 

ASF GitHub Bot commented on SPARK-20568:


gaborgsomogyi commented on a change in pull request #22952: [SPARK-20568][SS] 
Provide option to clean up completed files in streaming query
URL: https://github.com/apache/spark/pull/22952#discussion_r240168181
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ##
 @@ -329,4 +345,124 @@ object FileStreamSource {
 
 def size: Int = map.size()
   }
+
+  class FileStreamSourceCleaner(fileSystem: FileSystem, sourcePath: Path,
 
 Review comment:
   Why `FileStreamSourceCleaner` is public?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Delete files after processing in structured streaming
> -
>
> Key: SPARK-20568
> URL: https://issues.apache.org/jira/browse/SPARK-20568
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Affects Versions: 2.1.0, 2.2.1
>Reporter: Saul Shanabrook
>Priority: Major
>
> It would be great to be able to delete files after processing them with 
> structured streaming.
> For example, I am reading in a bunch of JSON files and converting them into 
> Parquet. If the JSON files are not deleted after they are processed, it 
> quickly fills up my hard drive. I originally [posted this on Stack 
> Overflow|http://stackoverflow.com/q/43671757/907060] and was recommended to 
> make a feature request for it. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20568) Delete files after processing in structured streaming

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-20568?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16717090#comment-16717090
 ] 

ASF GitHub Bot commented on SPARK-20568:


gaborgsomogyi commented on a change in pull request #22952: [SPARK-20568][SS] 
Provide option to clean up completed files in streaming query
URL: https://github.com/apache/spark/pull/22952#discussion_r240578220
 
 

 ##
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 ##
 @@ -1494,6 +1512,287 @@ class FileStreamSourceSuite extends 
FileStreamSourceTest {
   newSource.getBatch(None, FileStreamSourceOffset(1))
 }
   }
+
+  test("remove completed files when remove option is enabled") {
+def assertFileIsRemoved(files: Array[String], fileName: String): Unit = {
+  assert(!files.exists(_.startsWith(fileName)))
+}
+
+def assertFileIsNotRemoved(files: Array[String], fileName: String): Unit = 
{
+  assert(files.exists(_.startsWith(fileName)))
+}
+
+withTempDirs { case (src, tmp) =>
+  withSQLConf(
+SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key -> "2",
+// Force deleting the old logs
+SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY.key -> "1"
+  ) {
+val option = Map("latestFirst" -> "false", "maxFilesPerTrigger" -> "1",
+  "cleanSource" -> "delete")
+
+val fileStream = createFileStream("text", src.getCanonicalPath, 
options = option)
+val filtered = fileStream.filter($"value" contains "keep")
+
+testStream(filtered)(
+  AddTextFileData("keep1", src, tmp, tmpFilePrefix = "keep1"),
+  CheckAnswer("keep1"),
+  AssertOnQuery("input file removed") { _: StreamExecution =>
+// it doesn't rename any file yet
+assertFileIsNotRemoved(src.list(), "keep1")
+true
+  },
+  AddTextFileData("keep2", src, tmp, tmpFilePrefix = "ke ep2 %"),
+  CheckAnswer("keep1", "keep2"),
+  AssertOnQuery("input file removed") { _: StreamExecution =>
+val files = src.list()
+
+// it renames input file for first batch, but not for second batch 
yet
+assertFileIsRemoved(files, "keep1")
+assertFileIsNotRemoved(files, "ke ep2 %")
+
+true
+  },
+  AddTextFileData("keep3", src, tmp, tmpFilePrefix = "keep3"),
+  CheckAnswer("keep1", "keep2", "keep3"),
+  AssertOnQuery("input file renamed") { _: StreamExecution =>
+val files = src.list()
+
+// it renames input file for second batch, but not third batch yet
+assertFileIsRemoved(files, "ke ep2 %")
+assertFileIsNotRemoved(files, "keep3")
+
+true
+  }
+)
+  }
+}
+  }
+
+  test("move completed files to archive directory when archive option is 
enabled") {
+
+withThreeTempDirs { case (src, tmp, archiveDir) =>
+  withSQLConf(
+SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key -> "2",
+// Force deleting the old logs
+SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY.key -> "1"
+  ) {
+val option = Map("latestFirst" -> "false", "maxFilesPerTrigger" -> "1",
+  "cleanSource" -> "archive", "sourceArchiveDir" -> 
archiveDir.getAbsolutePath)
+
+val fileStream = createFileStream("text", 
s"${src.getCanonicalPath}/*/*",
+  options = option)
+val filtered = fileStream.filter($"value" contains "keep")
+
+// src/k %1
+// file: src/k1 %1/keep1
+val dirForKeep1 = new File(src, "k %1")
+// src/k1/k 2
+// file: src/k1/k 2/keep2
+val dirForKeep2 = new File(dirForKeep1, "k 2")
+// src/k3
+// file: src/k3/keep3
+val dirForKeep3 = new File(src, "k3")
+
+val expectedMovedDir1 = new File(archiveDir.getAbsolutePath + 
dirForKeep1.toURI.getPath)
+val expectedMovedDir2 = new File(archiveDir.getAbsolutePath + 
dirForKeep2.toURI.getPath)
+val expectedMovedDir3 = new File(archiveDir.getAbsolutePath + 
dirForKeep3.toURI.getPath)
+
+testStream(filtered)(
+  AddTextFileData("keep1", dirForKeep1, tmp, tmpFilePrefix = "keep1"),
+  CheckAnswer("keep1"),
+  AssertOnQuery("input file archived") { _: StreamExecution =>
+// it doesn't rename any file yet
+assertFileIsNotMoved(dirForKeep1, expectedMovedDir1, "keep1")
+true
+  },
+  AddTextFileData("keep2", dirForKeep2, tmp, tmpFilePrefix = "keep2 
%"),
+  CheckAnswer("keep1", "keep2"),
+  AssertOnQuery("input file archived") { _: StreamExecution =>
+// it renames input file for first batch, but not for second batch 
yet
+assertFileIsMoved(dirForKeep1, expectedMovedDir1, "keep1")
+assertFileIsNotMoved(dirForKeep2, expectedMovedDir2, "keep2 %

[jira] [Commented] (SPARK-20568) Delete files after processing in structured streaming

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-20568?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16717096#comment-16717096
 ] 

ASF GitHub Bot commented on SPARK-20568:


gaborgsomogyi commented on a change in pull request #22952: [SPARK-20568][SS] 
Provide option to clean up completed files in streaming query
URL: https://github.com/apache/spark/pull/22952#discussion_r240569488
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ##
 @@ -329,4 +345,124 @@ object FileStreamSource {
 
 def size: Int = map.size()
   }
+
+  class FileStreamSourceCleaner(fileSystem: FileSystem, sourcePath: Path,
+baseArchivePathString: Option[String]) extends 
Logging {
+
+private val sourceGlobFilters: Seq[GlobFilter] = 
buildSourceGlobFilters(sourcePath)
+
+private val baseArchivePath: Option[Path] = baseArchivePathString.map(new 
Path(_))
+
+def archive(entry: FileEntry): Unit = {
+  require(baseArchivePath.isDefined)
+
+  val curPath = new Path(new URI(entry.path))
+  val curPathUri = curPath.toUri
+
+  val newPath = buildArchiveFilePath(curPathUri)
+
+  if (baseArchivePath.get.depth() <= 2) {
 
 Review comment:
   I just don't understand this hardcoded 2. Maybe it belongs to 
`isArchiveFileMatchedAgainstSourcePattern`?
   If that can go away maybe it will be one condition only, not sure.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Delete files after processing in structured streaming
> -
>
> Key: SPARK-20568
> URL: https://issues.apache.org/jira/browse/SPARK-20568
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Affects Versions: 2.1.0, 2.2.1
>Reporter: Saul Shanabrook
>Priority: Major
>
> It would be great to be able to delete files after processing them with 
> structured streaming.
> For example, I am reading in a bunch of JSON files and converting them into 
> Parquet. If the JSON files are not deleted after they are processed, it 
> quickly fills up my hard drive. I originally [posted this on Stack 
> Overflow|http://stackoverflow.com/q/43671757/907060] and was recommended to 
> make a feature request for it. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20568) Delete files after processing in structured streaming

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-20568?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16717093#comment-16717093
 ] 

ASF GitHub Bot commented on SPARK-20568:


gaborgsomogyi commented on a change in pull request #22952: [SPARK-20568][SS] 
Provide option to clean up completed files in streaming query
URL: https://github.com/apache/spark/pull/22952#discussion_r240531740
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ##
 @@ -329,4 +345,124 @@ object FileStreamSource {
 
 def size: Int = map.size()
   }
+
+  class FileStreamSourceCleaner(fileSystem: FileSystem, sourcePath: Path,
+baseArchivePathString: Option[String]) extends 
Logging {
 
 Review comment:
   Nit: indent


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Delete files after processing in structured streaming
> -
>
> Key: SPARK-20568
> URL: https://issues.apache.org/jira/browse/SPARK-20568
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Affects Versions: 2.1.0, 2.2.1
>Reporter: Saul Shanabrook
>Priority: Major
>
> It would be great to be able to delete files after processing them with 
> structured streaming.
> For example, I am reading in a bunch of JSON files and converting them into 
> Parquet. If the JSON files are not deleted after they are processed, it 
> quickly fills up my hard drive. I originally [posted this on Stack 
> Overflow|http://stackoverflow.com/q/43671757/907060] and was recommended to 
> make a feature request for it. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20568) Delete files after processing in structured streaming

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-20568?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16717092#comment-16717092
 ] 

ASF GitHub Bot commented on SPARK-20568:


gaborgsomogyi commented on a change in pull request #22952: [SPARK-20568][SS] 
Provide option to clean up completed files in streaming query
URL: https://github.com/apache/spark/pull/22952#discussion_r240600729
 
 

 ##
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 ##
 @@ -1494,6 +1512,287 @@ class FileStreamSourceSuite extends 
FileStreamSourceTest {
   newSource.getBatch(None, FileStreamSourceOffset(1))
 }
   }
+
+  test("remove completed files when remove option is enabled") {
+def assertFileIsRemoved(files: Array[String], fileName: String): Unit = {
+  assert(!files.exists(_.startsWith(fileName)))
+}
+
+def assertFileIsNotRemoved(files: Array[String], fileName: String): Unit = 
{
+  assert(files.exists(_.startsWith(fileName)))
+}
+
+withTempDirs { case (src, tmp) =>
+  withSQLConf(
+SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key -> "2",
+// Force deleting the old logs
+SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY.key -> "1"
+  ) {
+val option = Map("latestFirst" -> "false", "maxFilesPerTrigger" -> "1",
+  "cleanSource" -> "delete")
+
+val fileStream = createFileStream("text", src.getCanonicalPath, 
options = option)
+val filtered = fileStream.filter($"value" contains "keep")
+
+testStream(filtered)(
+  AddTextFileData("keep1", src, tmp, tmpFilePrefix = "keep1"),
+  CheckAnswer("keep1"),
+  AssertOnQuery("input file removed") { _: StreamExecution =>
+// it doesn't rename any file yet
+assertFileIsNotRemoved(src.list(), "keep1")
+true
+  },
+  AddTextFileData("keep2", src, tmp, tmpFilePrefix = "ke ep2 %"),
+  CheckAnswer("keep1", "keep2"),
+  AssertOnQuery("input file removed") { _: StreamExecution =>
+val files = src.list()
+
+// it renames input file for first batch, but not for second batch 
yet
+assertFileIsRemoved(files, "keep1")
+assertFileIsNotRemoved(files, "ke ep2 %")
+
+true
+  },
+  AddTextFileData("keep3", src, tmp, tmpFilePrefix = "keep3"),
+  CheckAnswer("keep1", "keep2", "keep3"),
+  AssertOnQuery("input file renamed") { _: StreamExecution =>
+val files = src.list()
+
+// it renames input file for second batch, but not third batch yet
+assertFileIsRemoved(files, "ke ep2 %")
+assertFileIsNotRemoved(files, "keep3")
+
+true
+  }
+)
+  }
+}
+  }
+
+  test("move completed files to archive directory when archive option is 
enabled") {
+
+withThreeTempDirs { case (src, tmp, archiveDir) =>
+  withSQLConf(
+SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key -> "2",
+// Force deleting the old logs
+SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY.key -> "1"
+  ) {
+val option = Map("latestFirst" -> "false", "maxFilesPerTrigger" -> "1",
+  "cleanSource" -> "archive", "sourceArchiveDir" -> 
archiveDir.getAbsolutePath)
+
+val fileStream = createFileStream("text", 
s"${src.getCanonicalPath}/*/*",
+  options = option)
+val filtered = fileStream.filter($"value" contains "keep")
+
+// src/k %1
+// file: src/k1 %1/keep1
+val dirForKeep1 = new File(src, "k %1")
+// src/k1/k 2
+// file: src/k1/k 2/keep2
+val dirForKeep2 = new File(dirForKeep1, "k 2")
+// src/k3
+// file: src/k3/keep3
+val dirForKeep3 = new File(src, "k3")
+
+val expectedMovedDir1 = new File(archiveDir.getAbsolutePath + 
dirForKeep1.toURI.getPath)
+val expectedMovedDir2 = new File(archiveDir.getAbsolutePath + 
dirForKeep2.toURI.getPath)
+val expectedMovedDir3 = new File(archiveDir.getAbsolutePath + 
dirForKeep3.toURI.getPath)
+
+testStream(filtered)(
+  AddTextFileData("keep1", dirForKeep1, tmp, tmpFilePrefix = "keep1"),
+  CheckAnswer("keep1"),
+  AssertOnQuery("input file archived") { _: StreamExecution =>
+// it doesn't rename any file yet
+assertFileIsNotMoved(dirForKeep1, expectedMovedDir1, "keep1")
+true
+  },
+  AddTextFileData("keep2", dirForKeep2, tmp, tmpFilePrefix = "keep2 
%"),
+  CheckAnswer("keep1", "keep2"),
+  AssertOnQuery("input file archived") { _: StreamExecution =>
+// it renames input file for first batch, but not for second batch 
yet
+assertFileIsMoved(dirForKeep1, expectedMovedDir1, "keep1")
+assertFileIsNotMoved(dirForKeep2, expectedMovedDir2, "keep2 %

[jira] [Commented] (SPARK-20568) Delete files after processing in structured streaming

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-20568?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16717095#comment-16717095
 ] 

ASF GitHub Bot commented on SPARK-20568:


gaborgsomogyi commented on a change in pull request #22952: [SPARK-20568][SS] 
Provide option to clean up completed files in streaming query
URL: https://github.com/apache/spark/pull/22952#discussion_r240573844
 
 

 ##
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 ##
 @@ -1494,6 +1512,287 @@ class FileStreamSourceSuite extends 
FileStreamSourceTest {
   newSource.getBatch(None, FileStreamSourceOffset(1))
 }
   }
+
+  test("remove completed files when remove option is enabled") {
+def assertFileIsRemoved(files: Array[String], fileName: String): Unit = {
+  assert(!files.exists(_.startsWith(fileName)))
+}
+
+def assertFileIsNotRemoved(files: Array[String], fileName: String): Unit = 
{
+  assert(files.exists(_.startsWith(fileName)))
+}
+
+withTempDirs { case (src, tmp) =>
+  withSQLConf(
+SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key -> "2",
+// Force deleting the old logs
+SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY.key -> "1"
+  ) {
+val option = Map("latestFirst" -> "false", "maxFilesPerTrigger" -> "1",
+  "cleanSource" -> "delete")
+
+val fileStream = createFileStream("text", src.getCanonicalPath, 
options = option)
+val filtered = fileStream.filter($"value" contains "keep")
+
+testStream(filtered)(
+  AddTextFileData("keep1", src, tmp, tmpFilePrefix = "keep1"),
+  CheckAnswer("keep1"),
+  AssertOnQuery("input file removed") { _: StreamExecution =>
+// it doesn't rename any file yet
+assertFileIsNotRemoved(src.list(), "keep1")
+true
+  },
+  AddTextFileData("keep2", src, tmp, tmpFilePrefix = "ke ep2 %"),
+  CheckAnswer("keep1", "keep2"),
+  AssertOnQuery("input file removed") { _: StreamExecution =>
+val files = src.list()
+
+// it renames input file for first batch, but not for second batch 
yet
+assertFileIsRemoved(files, "keep1")
+assertFileIsNotRemoved(files, "ke ep2 %")
+
+true
+  },
+  AddTextFileData("keep3", src, tmp, tmpFilePrefix = "keep3"),
+  CheckAnswer("keep1", "keep2", "keep3"),
+  AssertOnQuery("input file renamed") { _: StreamExecution =>
+val files = src.list()
+
+// it renames input file for second batch, but not third batch yet
+assertFileIsRemoved(files, "ke ep2 %")
+assertFileIsNotRemoved(files, "keep3")
+
+true
+  }
+)
+  }
+}
+  }
+
+  test("move completed files to archive directory when archive option is 
enabled") {
+
+withThreeTempDirs { case (src, tmp, archiveDir) =>
+  withSQLConf(
+SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key -> "2",
+// Force deleting the old logs
+SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY.key -> "1"
+  ) {
+val option = Map("latestFirst" -> "false", "maxFilesPerTrigger" -> "1",
+  "cleanSource" -> "archive", "sourceArchiveDir" -> 
archiveDir.getAbsolutePath)
+
+val fileStream = createFileStream("text", 
s"${src.getCanonicalPath}/*/*",
+  options = option)
+val filtered = fileStream.filter($"value" contains "keep")
+
+// src/k %1
+// file: src/k1 %1/keep1
+val dirForKeep1 = new File(src, "k %1")
+// src/k1/k 2
+// file: src/k1/k 2/keep2
+val dirForKeep2 = new File(dirForKeep1, "k 2")
+// src/k3
+// file: src/k3/keep3
+val dirForKeep3 = new File(src, "k3")
+
+val expectedMovedDir1 = new File(archiveDir.getAbsolutePath + 
dirForKeep1.toURI.getPath)
+val expectedMovedDir2 = new File(archiveDir.getAbsolutePath + 
dirForKeep2.toURI.getPath)
+val expectedMovedDir3 = new File(archiveDir.getAbsolutePath + 
dirForKeep3.toURI.getPath)
+
+testStream(filtered)(
+  AddTextFileData("keep1", dirForKeep1, tmp, tmpFilePrefix = "keep1"),
+  CheckAnswer("keep1"),
+  AssertOnQuery("input file archived") { _: StreamExecution =>
+// it doesn't rename any file yet
+assertFileIsNotMoved(dirForKeep1, expectedMovedDir1, "keep1")
+true
+  },
+  AddTextFileData("keep2", dirForKeep2, tmp, tmpFilePrefix = "keep2 
%"),
+  CheckAnswer("keep1", "keep2"),
+  AssertOnQuery("input file archived") { _: StreamExecution =>
+// it renames input file for first batch, but not for second batch 
yet
+assertFileIsMoved(dirForKeep1, expectedMovedDir1, "keep1")
+assertFileIsNotMoved(dirForKeep2, expectedMovedDir2, "keep2 %

[jira] [Commented] (SPARK-20568) Delete files after processing in structured streaming

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-20568?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16717091#comment-16717091
 ] 

ASF GitHub Bot commented on SPARK-20568:


gaborgsomogyi commented on a change in pull request #22952: [SPARK-20568][SS] 
Provide option to clean up completed files in streaming query
URL: https://github.com/apache/spark/pull/22952#discussion_r240567783
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ##
 @@ -329,4 +345,124 @@ object FileStreamSource {
 
 def size: Int = map.size()
   }
+
+  class FileStreamSourceCleaner(fileSystem: FileSystem, sourcePath: Path,
+baseArchivePathString: Option[String]) extends 
Logging {
+
+private val sourceGlobFilters: Seq[GlobFilter] = 
buildSourceGlobFilters(sourcePath)
+
+private val baseArchivePath: Option[Path] = baseArchivePathString.map(new 
Path(_))
+
+def archive(entry: FileEntry): Unit = {
+  require(baseArchivePath.isDefined)
+
+  val curPath = new Path(new URI(entry.path))
+  val curPathUri = curPath.toUri
+
+  val newPath = buildArchiveFilePath(curPathUri)
+
+  if (baseArchivePath.get.depth() <= 2) {
+if (isArchiveFileMatchedAgainstSourcePattern(newPath)) {
+  logWarning(s"Fail to move $curPath to $newPath - destination matches 
" +
+s"to source path/pattern. Skip moving file.")
+} else {
+  doArchive(curPath, newPath)
+}
+  } else {
+// there's no chance for archive file to be matched against source 
pattern
+doArchive(curPath, newPath)
+  }
+}
+
+def remove(entry: FileEntry): Unit = {
+  val curPath = new Path(new URI(entry.path))
+  try {
+logDebug(s"Removing completed file $curPath")
+fileSystem.delete(curPath, false)
 
 Review comment:
   Would be good to check return value.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Delete files after processing in structured streaming
> -
>
> Key: SPARK-20568
> URL: https://issues.apache.org/jira/browse/SPARK-20568
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Affects Versions: 2.1.0, 2.2.1
>Reporter: Saul Shanabrook
>Priority: Major
>
> It would be great to be able to delete files after processing them with 
> structured streaming.
> For example, I am reading in a bunch of JSON files and converting them into 
> Parquet. If the JSON files are not deleted after they are processed, it 
> quickly fills up my hard drive. I originally [posted this on Stack 
> Overflow|http://stackoverflow.com/q/43671757/907060] and was recommended to 
> make a feature request for it. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20568) Delete files after processing in structured streaming

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-20568?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16717094#comment-16717094
 ] 

ASF GitHub Bot commented on SPARK-20568:


gaborgsomogyi commented on a change in pull request #22952: [SPARK-20568][SS] 
Provide option to clean up completed files in streaming query
URL: https://github.com/apache/spark/pull/22952#discussion_r240561879
 
 

 ##
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 ##
 @@ -1494,6 +1512,287 @@ class FileStreamSourceSuite extends 
FileStreamSourceTest {
   newSource.getBatch(None, FileStreamSourceOffset(1))
 }
   }
+
+  test("remove completed files when remove option is enabled") {
+def assertFileIsRemoved(files: Array[String], fileName: String): Unit = {
+  assert(!files.exists(_.startsWith(fileName)))
+}
+
+def assertFileIsNotRemoved(files: Array[String], fileName: String): Unit = 
{
+  assert(files.exists(_.startsWith(fileName)))
+}
+
+withTempDirs { case (src, tmp) =>
+  withSQLConf(
+SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key -> "2",
+// Force deleting the old logs
+SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY.key -> "1"
+  ) {
+val option = Map("latestFirst" -> "false", "maxFilesPerTrigger" -> "1",
+  "cleanSource" -> "delete")
+
+val fileStream = createFileStream("text", src.getCanonicalPath, 
options = option)
+val filtered = fileStream.filter($"value" contains "keep")
+
+testStream(filtered)(
+  AddTextFileData("keep1", src, tmp, tmpFilePrefix = "keep1"),
+  CheckAnswer("keep1"),
+  AssertOnQuery("input file removed") { _: StreamExecution =>
+// it doesn't rename any file yet
+assertFileIsNotRemoved(src.list(), "keep1")
+true
+  },
+  AddTextFileData("keep2", src, tmp, tmpFilePrefix = "ke ep2 %"),
+  CheckAnswer("keep1", "keep2"),
+  AssertOnQuery("input file removed") { _: StreamExecution =>
+val files = src.list()
+
+// it renames input file for first batch, but not for second batch 
yet
+assertFileIsRemoved(files, "keep1")
+assertFileIsNotRemoved(files, "ke ep2 %")
+
+true
+  },
+  AddTextFileData("keep3", src, tmp, tmpFilePrefix = "keep3"),
+  CheckAnswer("keep1", "keep2", "keep3"),
+  AssertOnQuery("input file renamed") { _: StreamExecution =>
+val files = src.list()
+
+// it renames input file for second batch, but not third batch yet
+assertFileIsRemoved(files, "ke ep2 %")
+assertFileIsNotRemoved(files, "keep3")
+
+true
+  }
+)
+  }
+}
+  }
+
+  test("move completed files to archive directory when archive option is 
enabled") {
+
+withThreeTempDirs { case (src, tmp, archiveDir) =>
+  withSQLConf(
+SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key -> "2",
+// Force deleting the old logs
+SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY.key -> "1"
+  ) {
+val option = Map("latestFirst" -> "false", "maxFilesPerTrigger" -> "1",
+  "cleanSource" -> "archive", "sourceArchiveDir" -> 
archiveDir.getAbsolutePath)
+
+val fileStream = createFileStream("text", 
s"${src.getCanonicalPath}/*/*",
+  options = option)
+val filtered = fileStream.filter($"value" contains "keep")
+
+// src/k %1
+// file: src/k1 %1/keep1
+val dirForKeep1 = new File(src, "k %1")
+// src/k1/k 2
+// file: src/k1/k 2/keep2
 
 Review comment:
   Nit: I think this is `file: src/k1 %1/k 2/keep2`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Delete files after processing in structured streaming
> -
>
> Key: SPARK-20568
> URL: https://issues.apache.org/jira/browse/SPARK-20568
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Affects Versions: 2.1.0, 2.2.1
>Reporter: Saul Shanabrook
>Priority: Major
>
> It would be great to be able to delete files after processing them with 
> structured streaming.
> For example, I am reading in a bunch of JSON files and converting them into 
> Parquet. If the JSON files are not deleted after they are processed, it 
> quickly fills up my hard drive. I originally [posted this on Stack 
> Overflow|http://stackoverflow.com/q/43671757/907060] and was recommended to 
> make a feature request 

[jira] [Commented] (SPARK-20568) Delete files after processing in structured streaming

2018-11-05 Thread Jungtaek Lim (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-20568?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16675833#comment-16675833
 ] 

Jungtaek Lim commented on SPARK-20568:
--

FYI, trait 'Source' has 'commit' method which is called with previously 
processed offset which is for exactly same purpose, so I just took the way.

> Delete files after processing in structured streaming
> -
>
> Key: SPARK-20568
> URL: https://issues.apache.org/jira/browse/SPARK-20568
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Affects Versions: 2.1.0, 2.2.1
>Reporter: Saul Shanabrook
>Priority: Major
>
> It would be great to be able to delete files after processing them with 
> structured streaming.
> For example, I am reading in a bunch of JSON files and converting them into 
> Parquet. If the JSON files are not deleted after they are processed, it 
> quickly fills up my hard drive. I originally [posted this on Stack 
> Overflow|http://stackoverflow.com/q/43671757/907060] and was recommended to 
> make a feature request for it. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20568) Delete files after processing in structured streaming

2018-11-05 Thread Apache Spark (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-20568?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16675823#comment-16675823
 ] 

Apache Spark commented on SPARK-20568:
--

User 'HeartSaVioR' has created a pull request for this issue:
https://github.com/apache/spark/pull/22952

> Delete files after processing in structured streaming
> -
>
> Key: SPARK-20568
> URL: https://issues.apache.org/jira/browse/SPARK-20568
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Affects Versions: 2.1.0, 2.2.1
>Reporter: Saul Shanabrook
>Priority: Major
>
> It would be great to be able to delete files after processing them with 
> structured streaming.
> For example, I am reading in a bunch of JSON files and converting them into 
> Parquet. If the JSON files are not deleted after they are processed, it 
> quickly fills up my hard drive. I originally [posted this on Stack 
> Overflow|http://stackoverflow.com/q/43671757/907060] and was recommended to 
> make a feature request for it. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20568) Delete files after processing in structured streaming

2018-11-05 Thread Apache Spark (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-20568?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16675822#comment-16675822
 ] 

Apache Spark commented on SPARK-20568:
--

User 'HeartSaVioR' has created a pull request for this issue:
https://github.com/apache/spark/pull/22952

> Delete files after processing in structured streaming
> -
>
> Key: SPARK-20568
> URL: https://issues.apache.org/jira/browse/SPARK-20568
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Affects Versions: 2.1.0, 2.2.1
>Reporter: Saul Shanabrook
>Priority: Major
>
> It would be great to be able to delete files after processing them with 
> structured streaming.
> For example, I am reading in a bunch of JSON files and converting them into 
> Parquet. If the JSON files are not deleted after they are processed, it 
> quickly fills up my hard drive. I originally [posted this on Stack 
> Overflow|http://stackoverflow.com/q/43671757/907060] and was recommended to 
> make a feature request for it. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20568) Delete files after processing in structured streaming

2018-11-02 Thread Jungtaek Lim (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-20568?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16672805#comment-16672805
 ] 

Jungtaek Lim commented on SPARK-20568:
--

[~zsxwing]
I've thought about it a bit. I'm not familiar with file stream source, but if 
I'm not missing here, there's no "progressing" state of file: file should be 
processed in a batch once it is included.

So we have two options here:

1. Delete (or move out) files which are included in finished batch files in 
"sources" directory in checkpoint.
2. Delete (or move out) files which are included in "current" batch when batch 
is just completed.

If we move out files to some directory like "archive", I guess option 2 is 
safe. Moved files can be moved again to re-run previous batch if end users 
really want. Actually I haven't heard actual cases which remove some batches in 
checkpoint directory to rerun previous batch.

What do you think about the options?

> Delete files after processing in structured streaming
> -
>
> Key: SPARK-20568
> URL: https://issues.apache.org/jira/browse/SPARK-20568
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Affects Versions: 2.1.0, 2.2.1
>Reporter: Saul Shanabrook
>Priority: Major
>
> It would be great to be able to delete files after processing them with 
> structured streaming.
> For example, I am reading in a bunch of JSON files and converting them into 
> Parquet. If the JSON files are not deleted after they are processed, it 
> quickly fills up my hard drive. I originally [posted this on Stack 
> Overflow|http://stackoverflow.com/q/43671757/907060] and was recommended to 
> make a feature request for it. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20568) Delete files after processing in structured streaming

2018-10-31 Thread Jungtaek Lim (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-20568?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16670904#comment-16670904
 ] 

Jungtaek Lim commented on SPARK-20568:
--

[~zsxwing]
Yes I'll see the time slot and take a look at how to achieve this. Thanks for 
pinging me!

> Delete files after processing in structured streaming
> -
>
> Key: SPARK-20568
> URL: https://issues.apache.org/jira/browse/SPARK-20568
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Affects Versions: 2.1.0, 2.2.1
>Reporter: Saul Shanabrook
>Priority: Major
>
> It would be great to be able to delete files after processing them with 
> structured streaming.
> For example, I am reading in a bunch of JSON files and converting them into 
> Parquet. If the JSON files are not deleted after they are processed, it 
> quickly fills up my hard drive. I originally [posted this on Stack 
> Overflow|http://stackoverflow.com/q/43671757/907060] and was recommended to 
> make a feature request for it. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20568) Delete files after processing in structured streaming

2018-10-31 Thread Shixiong Zhu (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-20568?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16670897#comment-16670897
 ] 

Shixiong Zhu commented on SPARK-20568:
--

[~kabhwan] I think this is pretty useful. Do you have time working on this?

> Delete files after processing in structured streaming
> -
>
> Key: SPARK-20568
> URL: https://issues.apache.org/jira/browse/SPARK-20568
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Affects Versions: 2.1.0, 2.2.1
>Reporter: Saul Shanabrook
>Priority: Major
>
> It would be great to be able to delete files after processing them with 
> structured streaming.
> For example, I am reading in a bunch of JSON files and converting them into 
> Parquet. If the JSON files are not deleted after they are processed, it 
> quickly fills up my hard drive. I originally [posted this on Stack 
> Overflow|http://stackoverflow.com/q/43671757/907060] and was recommended to 
> make a feature request for it. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20568) Delete files after processing in structured streaming

2018-08-15 Thread Jungtaek Lim (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-20568?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16581979#comment-16581979
 ] 

Jungtaek Lim commented on SPARK-20568:
--

For me, the feature looks like the missing spot for streaming query. Unlike 
batch query, streaming query has checkpoints which refer specific batches, and 
in theory we should be able to remove files which are processed before earliest 
available batch in checkpoints, because the query can't be rolled back prior to 
earliest checkpoint.

Stream processing assumes that events come infinitely, and based on assumption 
Spark should provide safer way(s) to move/remove old events which are going to 
be never accessed later. I believe Spark Kafka connector is able to deal with 
Kafka retention. Text source should also support this.

[~srowen] [~zsxwing] Could we revisit this? If we find benefit on supporting 
this, I'll think about how to do it and provide a patch.

> Delete files after processing in structured streaming
> -
>
> Key: SPARK-20568
> URL: https://issues.apache.org/jira/browse/SPARK-20568
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Affects Versions: 2.1.0, 2.2.1
>Reporter: Saul Shanabrook
>Priority: Major
>
> It would be great to be able to delete files after processing them with 
> structured streaming.
> For example, I am reading in a bunch of JSON files and converting them into 
> Parquet. If the JSON files are not deleted after they are processed, it 
> quickly fills up my hard drive. I originally [posted this on Stack 
> Overflow|http://stackoverflow.com/q/43671757/907060] and was recommended to 
> make a feature request for it. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20568) Delete files after processing in structured streaming

2018-02-08 Thread Julian (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20568?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16356929#comment-16356929
 ] 

Julian commented on SPARK-20568:


I've started with Data ingestion using Structured Streaming where we will be 
processing large amounts of csv data (later XML via kafka to which I hope to 
switch to the kafka structured streaming source). In short, about 6+GB per 
minute which we need to process/transform through Spark. On smaller scale / 
user data sets, I can understand wanting to keep the data, however on large 
scale ELT/ETL and/or streaming flows, we typically want to archive the last N 
hours/days for recovery purposes - the raw data is just too large to keep (note 
above is just one of already 30 data sources we have connected and many more 
are coming). Often upstream systems also can re-push the data, so keeping is 
not a problem for all sources. It is very useful for us to be able to move the 
data once it is processed. I have no choice but to implement a solution for 
this, but I at least know now I need to build something for this. I can think 
of some simple "hdfs dfs -mv" commands to achieve something like this but I'm 
not yet fully understanding the relationship between the input files, for each 
writer close() method and parallel nature on the HDP cluster. Also, I notice if 
the process dies and restarts, it reads the data again (at the moment) which 
would be a disaster with this much data! Need to figure that out to.

> Delete files after processing in structured streaming
> -
>
> Key: SPARK-20568
> URL: https://issues.apache.org/jira/browse/SPARK-20568
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Affects Versions: 2.1.0
>Reporter: Saul Shanabrook
>Priority: Major
>
> It would be great to be able to delete files after processing them with 
> structured streaming.
> For example, I am reading in a bunch of JSON files and converting them into 
> Parquet. If the JSON files are not deleted after they are processed, it 
> quickly fills up my hard drive. I originally [posted this on Stack 
> Overflow|http://stackoverflow.com/q/43671757/907060] and was recommended to 
> make a feature request for it. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20568) Delete files after processing in structured streaming

2017-06-19 Thread Fei Shao (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20568?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16053871#comment-16053871
 ] 

Fei Shao commented on SPARK-20568:
--

I also  do not support this feature too.
If we delete files processed, we can not do other works about them later.
If we want to do this feature, we can add a flag to indicate whether to delete 
files processed.



> Delete files after processing in structured streaming
> -
>
> Key: SPARK-20568
> URL: https://issues.apache.org/jira/browse/SPARK-20568
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Affects Versions: 2.1.0
>Reporter: Saul Shanabrook
>
> It would be great to be able to delete files after processing them with 
> structured streaming.
> For example, I am reading in a bunch of JSON files and converting them into 
> Parquet. If the JSON files are not deleted after they are processed, it 
> quickly fills up my hard drive. I originally [posted this on Stack 
> Overflow|http://stackoverflow.com/q/43671757/907060] and was recommended to 
> make a feature request for it. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20568) Delete files after processing in structured streaming

2017-05-03 Thread Shixiong Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20568?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15995507#comment-15995507
 ] 

Shixiong Zhu commented on SPARK-20568:
--

[~srowen] Structured Streaming's Source has a "commit" method to let a source 
discard unused data. This should be easy to implement in  Structured Streaming.

> Delete files after processing in structured streaming
> -
>
> Key: SPARK-20568
> URL: https://issues.apache.org/jira/browse/SPARK-20568
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Affects Versions: 2.1.0
>Reporter: Saul Shanabrook
>
> It would be great to be able to delete files after processing them with 
> structured streaming.
> For example, I am reading in a bunch of JSON files and converting them into 
> Parquet. If the JSON files are not deleted after they are processed, it 
> quickly fills up my hard drive. I originally [posted this on Stack 
> Overflow|http://stackoverflow.com/q/43671757/907060] and was recommended to 
> make a feature request for it. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org