[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22952#discussion_r237481604 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala --- @@ -257,16 +289,65 @@ class FileStreamSource( * equal to `end` and will only request offsets greater than `end` in the future. */ override def commit(end: Offset): Unit = { -// No-op for now; FileStreamSource currently garbage-collects files based on timestamp -// and the value of the maxFileAge parameter. +def move(entry: FileEntry, baseArchiveDirPath: String): Unit = { + val curPath = new Path(entry.path) + val curPathUri = curPath.toUri + + val newPath = new Path(baseArchiveDirPath + curPathUri.getPath) + try { +logDebug(s"Creating directory if it doesn't exist ${newPath.getParent}") +if (!fs.exists(newPath.getParent)) { + fs.mkdirs(newPath.getParent) +} + +logDebug(s"Archiving completed file $curPath to $newPath") +fs.rename(curPath, newPath) + } catch { +case NonFatal(e) => + // Log to error but swallow exception to avoid process being stopped + logWarning(s"Fail to move $curPath to $newPath / skip moving file.", e) + } +} + +def remove(entry: FileEntry): Unit = { + val curPath = new Path(entry.path) --- End diff -- I just modified existing UT to have space and % in directory name as well as file name. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22952#discussion_r237342362 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala --- @@ -257,16 +289,65 @@ class FileStreamSource( * equal to `end` and will only request offsets greater than `end` in the future. */ override def commit(end: Offset): Unit = { -// No-op for now; FileStreamSource currently garbage-collects files based on timestamp -// and the value of the maxFileAge parameter. +def move(entry: FileEntry, baseArchiveDirPath: String): Unit = { + val curPath = new Path(entry.path) --- End diff -- Will address. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22952#discussion_r237342346 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala --- @@ -100,6 +101,36 @@ class FileStreamSource( logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, maxFileAgeMs = $maxFileAgeMs") + ensureNoOverlapBetweenSourceAndArchivePath() + + private def ensureNoOverlapBetweenSourceAndArchivePath(): Unit = { +@tailrec +def removeGlob(path: Path): Path = { + if (path.getName.contains("*")) { +removeGlob(path.getParent) + } else { +path + } +} + +sourceOptions.sourceArchiveDir match { + case None => + case Some(archiveDir) => +val sourceUri = removeGlob(qualifiedBasePath).toUri +val archiveUri = new Path(archiveDir).toUri + +val sourcePath = sourceUri.getPath +val archivePath = archiveUri.getPath --- End diff -- Nice finding. Will address. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22952#discussion_r237342072 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala --- @@ -257,16 +289,65 @@ class FileStreamSource( * equal to `end` and will only request offsets greater than `end` in the future. */ override def commit(end: Offset): Unit = { -// No-op for now; FileStreamSource currently garbage-collects files based on timestamp -// and the value of the maxFileAge parameter. +def move(entry: FileEntry, baseArchiveDirPath: String): Unit = { + val curPath = new Path(entry.path) + val curPathUri = curPath.toUri + + val newPath = new Path(baseArchiveDirPath + curPathUri.getPath) + try { +logDebug(s"Creating directory if it doesn't exist ${newPath.getParent}") +if (!fs.exists(newPath.getParent)) { + fs.mkdirs(newPath.getParent) +} + +logDebug(s"Archiving completed file $curPath to $newPath") +fs.rename(curPath, newPath) + } catch { +case NonFatal(e) => + // Log to error but swallow exception to avoid process being stopped + logWarning(s"Fail to move $curPath to $newPath / skip moving file.", e) + } +} + +def remove(entry: FileEntry): Unit = { + val curPath = new Path(entry.path) --- End diff -- Yeah... actually I was somewhat confused I have to escape/unescape for path. Thanks for suggestion. Will address and add a new unit test for testing it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22952#discussion_r237341854 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala --- @@ -257,16 +289,65 @@ class FileStreamSource( * equal to `end` and will only request offsets greater than `end` in the future. */ override def commit(end: Offset): Unit = { -// No-op for now; FileStreamSource currently garbage-collects files based on timestamp -// and the value of the maxFileAge parameter. +def move(entry: FileEntry, baseArchiveDirPath: String): Unit = { + val curPath = new Path(entry.path) + val curPathUri = curPath.toUri + + val newPath = new Path(baseArchiveDirPath + curPathUri.getPath) + try { +logDebug(s"Creating directory if it doesn't exist ${newPath.getParent}") +if (!fs.exists(newPath.getParent)) { + fs.mkdirs(newPath.getParent) +} + +logDebug(s"Archiving completed file $curPath to $newPath") +fs.rename(curPath, newPath) + } catch { +case NonFatal(e) => + // Log to error but swallow exception to avoid process being stopped + logWarning(s"Fail to move $curPath to $newPath / skip moving file.", e) + } +} + +def remove(entry: FileEntry): Unit = { + val curPath = new Path(entry.path) + try { +logDebug(s"Removing completed file $curPath") +fs.delete(curPath, false) + } catch { +case NonFatal(e) => + // Log to error but swallow exception to avoid process being stopped + logWarning(s"Fail to remove $curPath / skip removing file.", e) + } +} + +val logOffset = FileStreamSourceOffset(end).logOffset +metadataLog.get(logOffset) match { --- End diff -- Ah I didn't indicate that. Thanks for letting me know! Will address. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22952#discussion_r237341425 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala --- @@ -257,16 +289,65 @@ class FileStreamSource( * equal to `end` and will only request offsets greater than `end` in the future. */ override def commit(end: Offset): Unit = { -// No-op for now; FileStreamSource currently garbage-collects files based on timestamp -// and the value of the maxFileAge parameter. +def move(entry: FileEntry, baseArchiveDirPath: String): Unit = { + val curPath = new Path(entry.path) + val curPathUri = curPath.toUri + + val newPath = new Path(baseArchiveDirPath + curPathUri.getPath) + try { +logDebug(s"Creating directory if it doesn't exist ${newPath.getParent}") +if (!fs.exists(newPath.getParent)) { + fs.mkdirs(newPath.getParent) +} + +logDebug(s"Archiving completed file $curPath to $newPath") +fs.rename(curPath, newPath) --- End diff -- Yeah, I guess the patch prevents the case if it works like my expectation, but I'm also in favor of defensive programming and logging would be better for end users. Will address. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22952#discussion_r237340952 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala --- @@ -74,6 +76,39 @@ class FileStreamOptions(parameters: CaseInsensitiveMap[String]) extends Logging */ val fileNameOnly: Boolean = withBooleanParameter("fileNameOnly", false) + /** + * The archive directory to move completed files. The option will be only effective when + * "cleanSource" is set to "archive". + * + * Note that the completed file will be moved to this archive directory with respecting to + * its own path. + * + * For example, if the path of source file is "/a/b/dataset.txt", and the path of archive + * directory is "/archived/here", file will be moved to "/archived/here/a/b/dataset.txt". + */ + val sourceArchiveDir: Option[String] = parameters.get("sourceArchiveDir") + + /** + * Defines how to clean up completed files. Available options are "archive", "delete", "no_op". + */ + val cleanSource: CleanSourceMode.Value = { +val modeStrOption = parameters.getOrElse("cleanSource", CleanSourceMode.NO_OP.toString) --- End diff -- OK will address. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22952#discussion_r237340938 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala --- @@ -100,6 +101,36 @@ class FileStreamSource( logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, maxFileAgeMs = $maxFileAgeMs") + ensureNoOverlapBetweenSourceAndArchivePath() --- End diff -- Ah yes missed it. Will address. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22952#discussion_r237340601 --- Diff: docs/structured-streaming-programming-guide.md --- @@ -530,6 +530,12 @@ Here are the details of all the sources in Spark. "s3://a/dataset.txt" "s3n://a/b/dataset.txt" "s3a://a/b/c/dataset.txt" +cleanSource: option to clean up completed files after processing. +Available options are "archive", "delete", "no_op". If the option is not provided, the default value is "no_op". +When "archive" is provided, additional option sourceArchiveDir must be provided as well. The value of "sourceArchiveDir" must be outside of source path, to ensure archived files are never included to new source files again. +Spark will move source files respecting its own path. For example, if the path of source file is "/a/b/dataset.txt" and the path of archive directory is "/archived/here", file will be moved to "/archived/here/a/b/dataset.txt" +NOTE: Both archiving (via moving) or deleting completed files would introduce overhead (slow down) in each micro-batch, so you need to understand the cost for each operation in your file system before enabling this option. On the other hand, enbling this option would reduce the cost to list source files which is considered as a heavy operation. +NOTE 2: The source path should not be used from multiple queries when enabling this option, because source files will be moved or deleted which behavior may impact the other queries. --- End diff -- Nice finding. I missed the case which multiple sources in same query refer same file directory. Will address. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22952#discussion_r237319928 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala --- @@ -257,16 +289,65 @@ class FileStreamSource( * equal to `end` and will only request offsets greater than `end` in the future. */ override def commit(end: Offset): Unit = { -// No-op for now; FileStreamSource currently garbage-collects files based on timestamp -// and the value of the maxFileAge parameter. +def move(entry: FileEntry, baseArchiveDirPath: String): Unit = { + val curPath = new Path(entry.path) --- End diff -- ditto --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22952#discussion_r237314690 --- Diff: docs/structured-streaming-programming-guide.md --- @@ -530,6 +530,12 @@ Here are the details of all the sources in Spark. "s3://a/dataset.txt" "s3n://a/b/dataset.txt" "s3a://a/b/c/dataset.txt" +cleanSource: option to clean up completed files after processing. +Available options are "archive", "delete", "no_op". If the option is not provided, the default value is "no_op". +When "archive" is provided, additional option sourceArchiveDir must be provided as well. The value of "sourceArchiveDir" must be outside of source path, to ensure archived files are never included to new source files again. +Spark will move source files respecting its own path. For example, if the path of source file is "/a/b/dataset.txt" and the path of archive directory is "/archived/here", file will be moved to "/archived/here/a/b/dataset.txt" +NOTE: Both archiving (via moving) or deleting completed files would introduce overhead (slow down) in each micro-batch, so you need to understand the cost for each operation in your file system before enabling this option. On the other hand, enbling this option would reduce the cost to list source files which is considered as a heavy operation. +NOTE 2: The source path should not be used from multiple queries when enabling this option, because source files will be moved or deleted which behavior may impact the other queries. --- End diff -- NOTE 3: Both delete and move actions are best effort. Failing to delete or move files will not fail the streaming query. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22952#discussion_r237319903 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala --- @@ -257,16 +289,65 @@ class FileStreamSource( * equal to `end` and will only request offsets greater than `end` in the future. */ override def commit(end: Offset): Unit = { -// No-op for now; FileStreamSource currently garbage-collects files based on timestamp -// and the value of the maxFileAge parameter. +def move(entry: FileEntry, baseArchiveDirPath: String): Unit = { + val curPath = new Path(entry.path) + val curPathUri = curPath.toUri + + val newPath = new Path(baseArchiveDirPath + curPathUri.getPath) + try { +logDebug(s"Creating directory if it doesn't exist ${newPath.getParent}") +if (!fs.exists(newPath.getParent)) { + fs.mkdirs(newPath.getParent) +} + +logDebug(s"Archiving completed file $curPath to $newPath") +fs.rename(curPath, newPath) + } catch { +case NonFatal(e) => + // Log to error but swallow exception to avoid process being stopped + logWarning(s"Fail to move $curPath to $newPath / skip moving file.", e) + } +} + +def remove(entry: FileEntry): Unit = { + val curPath = new Path(entry.path) --- End diff -- `val curPath = new Path(new URI(entry.path))` to make it escape/unescape path properly. `entry.path` was created from `Path.toUri.toString`. Could you also add a unit test to test special paths such as `/a/b/a b%.txt`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22952#discussion_r237319176 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala --- @@ -257,16 +289,65 @@ class FileStreamSource( * equal to `end` and will only request offsets greater than `end` in the future. */ override def commit(end: Offset): Unit = { -// No-op for now; FileStreamSource currently garbage-collects files based on timestamp -// and the value of the maxFileAge parameter. +def move(entry: FileEntry, baseArchiveDirPath: String): Unit = { + val curPath = new Path(entry.path) + val curPathUri = curPath.toUri + + val newPath = new Path(baseArchiveDirPath + curPathUri.getPath) + try { +logDebug(s"Creating directory if it doesn't exist ${newPath.getParent}") +if (!fs.exists(newPath.getParent)) { + fs.mkdirs(newPath.getParent) +} + +logDebug(s"Archiving completed file $curPath to $newPath") +fs.rename(curPath, newPath) + } catch { +case NonFatal(e) => + // Log to error but swallow exception to avoid process being stopped + logWarning(s"Fail to move $curPath to $newPath / skip moving file.", e) + } +} + +def remove(entry: FileEntry): Unit = { + val curPath = new Path(entry.path) + try { +logDebug(s"Removing completed file $curPath") +fs.delete(curPath, false) + } catch { +case NonFatal(e) => + // Log to error but swallow exception to avoid process being stopped + logWarning(s"Fail to remove $curPath / skip removing file.", e) + } +} + +val logOffset = FileStreamSourceOffset(end).logOffset +metadataLog.get(logOffset) match { --- End diff -- you can use `val files = metadataLog.get(Some(logOffset), Some(logOffset)).flatMap(_._2)` to use the underlying cache in FileStreamSourceLog. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22952#discussion_r237200636 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala --- @@ -74,6 +76,39 @@ class FileStreamOptions(parameters: CaseInsensitiveMap[String]) extends Logging */ val fileNameOnly: Boolean = withBooleanParameter("fileNameOnly", false) + /** + * The archive directory to move completed files. The option will be only effective when + * "cleanSource" is set to "archive". + * + * Note that the completed file will be moved to this archive directory with respecting to + * its own path. + * + * For example, if the path of source file is "/a/b/dataset.txt", and the path of archive + * directory is "/archived/here", file will be moved to "/archived/here/a/b/dataset.txt". + */ + val sourceArchiveDir: Option[String] = parameters.get("sourceArchiveDir") + + /** + * Defines how to clean up completed files. Available options are "archive", "delete", "no_op". + */ + val cleanSource: CleanSourceMode.Value = { +val modeStrOption = parameters.getOrElse("cleanSource", CleanSourceMode.NO_OP.toString) --- End diff -- nit: could you create a method to `CleanSourceMode` to convert a string to `CleanSourceMode.Value`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22952#discussion_r237315173 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala --- @@ -100,6 +101,36 @@ class FileStreamSource( logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, maxFileAgeMs = $maxFileAgeMs") + ensureNoOverlapBetweenSourceAndArchivePath() --- End diff -- Could you do this check only when CleanSourceMode is `ARCHIVE`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22952#discussion_r237315718 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala --- @@ -257,16 +289,65 @@ class FileStreamSource( * equal to `end` and will only request offsets greater than `end` in the future. */ override def commit(end: Offset): Unit = { -// No-op for now; FileStreamSource currently garbage-collects files based on timestamp -// and the value of the maxFileAge parameter. +def move(entry: FileEntry, baseArchiveDirPath: String): Unit = { + val curPath = new Path(entry.path) + val curPathUri = curPath.toUri + + val newPath = new Path(baseArchiveDirPath + curPathUri.getPath) + try { +logDebug(s"Creating directory if it doesn't exist ${newPath.getParent}") +if (!fs.exists(newPath.getParent)) { + fs.mkdirs(newPath.getParent) +} + +logDebug(s"Archiving completed file $curPath to $newPath") +fs.rename(curPath, newPath) --- End diff -- It's better to also check the return value of `rename`. A user may reuse a source archive dir and cause path conflicts. We should also log this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22952#discussion_r237320515 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala --- @@ -100,6 +101,36 @@ class FileStreamSource( logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, maxFileAgeMs = $maxFileAgeMs") + ensureNoOverlapBetweenSourceAndArchivePath() + + private def ensureNoOverlapBetweenSourceAndArchivePath(): Unit = { +@tailrec +def removeGlob(path: Path): Path = { + if (path.getName.contains("*")) { +removeGlob(path.getParent) + } else { +path + } +} + +sourceOptions.sourceArchiveDir match { + case None => + case Some(archiveDir) => +val sourceUri = removeGlob(qualifiedBasePath).toUri +val archiveUri = new Path(archiveDir).toUri + +val sourcePath = sourceUri.getPath +val archivePath = archiveUri.getPath --- End diff -- we need to use `fs.makeQualified` to turn all user provided paths to absolute paths as the user may just pass a relative path. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22952#discussion_r237314459 --- Diff: docs/structured-streaming-programming-guide.md --- @@ -530,6 +530,12 @@ Here are the details of all the sources in Spark. "s3://a/dataset.txt" "s3n://a/b/dataset.txt" "s3a://a/b/c/dataset.txt" +cleanSource: option to clean up completed files after processing. +Available options are "archive", "delete", "no_op". If the option is not provided, the default value is "no_op". +When "archive" is provided, additional option sourceArchiveDir must be provided as well. The value of "sourceArchiveDir" must be outside of source path, to ensure archived files are never included to new source files again. +Spark will move source files respecting its own path. For example, if the path of source file is "/a/b/dataset.txt" and the path of archive directory is "/archived/here", file will be moved to "/archived/here/a/b/dataset.txt" +NOTE: Both archiving (via moving) or deleting completed files would introduce overhead (slow down) in each micro-batch, so you need to understand the cost for each operation in your file system before enabling this option. On the other hand, enbling this option would reduce the cost to list source files which is considered as a heavy operation. +NOTE 2: The source path should not be used from multiple queries when enabling this option, because source files will be moved or deleted which behavior may impact the other queries. --- End diff -- NOTE 2: The source path should not be used from multiple **sources or** queries when enabling this option, because source files will be moved or deleted which behavior may impact the other **sources and** queries. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22952#discussion_r235632761 --- Diff: docs/structured-streaming-programming-guide.md --- @@ -530,6 +530,12 @@ Here are the details of all the sources in Spark. "s3://a/dataset.txt" "s3n://a/b/dataset.txt" "s3a://a/b/c/dataset.txt" +cleanSource: option to clean up completed files after processing. +Available options are "archive", "delete", "no_op". If the option is not provided, the default value is "no_op". +When "archive" is provided, additional option sourceArchiveDir must be provided as well. The value of "sourceArchiveDir" must be outside of source path, to ensure archived files are never included to new source files again. --- End diff -- Yeah I guess you're right. I'll add a logic to check in initialization on FileStreamSource. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22952#discussion_r235632872 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala --- @@ -74,6 +76,43 @@ class FileStreamOptions(parameters: CaseInsensitiveMap[String]) extends Logging */ val fileNameOnly: Boolean = withBooleanParameter("fileNameOnly", false) + /** + * The archive directory to move completed files. The option will be only effective when + * "cleanSource" is set to "archive". + * + * Note that the completed file will be moved to this archive directory with respecting to + * its own path. + * + * For example, if the path of source file is "/a/b/dataset.txt", and the path of archive + * directory is "/archived/here", file will be moved to "/archived/here/a/b/dataset.txt". + */ + val sourceArchiveDir: Option[String] = parameters.get("sourceArchiveDir") + + /** + * Defines how to clean up completed files. Available options are "archive", "delete", "no_op". + */ + val cleanSource: CleanSourceMode.Value = { +val modeStrOption = parameters.getOrElse("cleanSource", CleanSourceMode.NO_OP.toString) + .toUpperCase(Locale.ROOT) + +val matchedModeOpt = CleanSourceMode.values.find(_.toString == modeStrOption) +if (matchedModeOpt.isEmpty) { --- End diff -- Will address. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22952#discussion_r235632809 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala --- @@ -257,16 +258,64 @@ class FileStreamSource( * equal to `end` and will only request offsets greater than `end` in the future. */ override def commit(end: Offset): Unit = { -// No-op for now; FileStreamSource currently garbage-collects files based on timestamp -// and the value of the maxFileAge parameter. +def move(entry: FileEntry, baseArchiveDirPath: String): Unit = { + val curPath = new Path(entry.path) + val curPathUri = curPath.toUri + + val newPath = new Path(baseArchiveDirPath + curPathUri.getPath) + if (!fs.exists(newPath.getParent)) { --- End diff -- Nice finding. Will address. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22952#discussion_r235314493 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala --- @@ -257,16 +258,64 @@ class FileStreamSource( * equal to `end` and will only request offsets greater than `end` in the future. */ override def commit(end: Offset): Unit = { -// No-op for now; FileStreamSource currently garbage-collects files based on timestamp -// and the value of the maxFileAge parameter. +def move(entry: FileEntry, baseArchiveDirPath: String): Unit = { + val curPath = new Path(entry.path) + val curPathUri = curPath.toUri + + val newPath = new Path(baseArchiveDirPath + curPathUri.getPath) + if (!fs.exists(newPath.getParent)) { --- End diff -- These fs operation can also throw exception. Why not covered these as well with try? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22952#discussion_r235312035 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala --- @@ -74,6 +76,43 @@ class FileStreamOptions(parameters: CaseInsensitiveMap[String]) extends Logging */ val fileNameOnly: Boolean = withBooleanParameter("fileNameOnly", false) + /** + * The archive directory to move completed files. The option will be only effective when + * "cleanSource" is set to "archive". + * + * Note that the completed file will be moved to this archive directory with respecting to + * its own path. + * + * For example, if the path of source file is "/a/b/dataset.txt", and the path of archive + * directory is "/archived/here", file will be moved to "/archived/here/a/b/dataset.txt". + */ + val sourceArchiveDir: Option[String] = parameters.get("sourceArchiveDir") + + /** + * Defines how to clean up completed files. Available options are "archive", "delete", "no_op". + */ + val cleanSource: CleanSourceMode.Value = { +val modeStrOption = parameters.getOrElse("cleanSource", CleanSourceMode.NO_OP.toString) + .toUpperCase(Locale.ROOT) + +val matchedModeOpt = CleanSourceMode.values.find(_.toString == modeStrOption) +if (matchedModeOpt.isEmpty) { --- End diff -- This can be simplified something like: ``` matchedModeOpt match { case None => throw new IllegalArgumentException(s"Invalid mode for clean source option $modeStrOption." + s" Must be one of ${CleanSourceMode.values.mkString(",")}") case Some(matchedMode) => if (matchedMode == CleanSourceMode.ARCHIVE && sourceArchiveDir.isEmpty) { throw new IllegalArgumentException("Archive mode must be used with 'sourceArchiveDir' " + "option.") } matchedMode } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org