[GitHub] [spark] HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query

2019-12-16 Thread GitBox
HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] 
Provide option to clean up completed files in streaming query
URL: https://github.com/apache/spark/pull/22952#discussion_r358624511
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ##
 @@ -330,4 +341,96 @@ object FileStreamSource {
 
 def size: Int = map.size()
   }
+
+  private[sql] trait FileStreamSourceCleaner {
+def clean(entry: FileEntry): Unit
+  }
+
+  private[sql] object FileStreamSourceCleaner {
+def apply(
+fileSystem: FileSystem,
+sourcePath: Path,
+option: FileStreamOptions,
+hadoopConf: Configuration): Option[FileStreamSourceCleaner] = 
option.cleanSource match {
+  case CleanSourceMode.ARCHIVE =>
+require(option.sourceArchiveDir.isDefined)
+val path = new Path(option.sourceArchiveDir.get)
+val archiveFs = path.getFileSystem(hadoopConf)
+val qualifiedArchivePath = archiveFs.makeQualified(path)
+Some(new SourceFileArchiver(fileSystem, sourcePath, archiveFs, 
qualifiedArchivePath))
+
+  case CleanSourceMode.DELETE =>
+Some(new SourceFileRemover(fileSystem))
+
+  case _ => None
+}
+  }
+
+  private[sql] class SourceFileArchiver(
+  fileSystem: FileSystem,
+  sourcePath: Path,
+  baseArchiveFileSystem: FileSystem,
+  baseArchivePath: Path) extends FileStreamSourceCleaner with Logging {
+assertParameters()
+
+private def assertParameters(): Unit = {
+  require(fileSystem.getUri == baseArchiveFileSystem.getUri, "Base archive 
path is located " +
+s"on a different file system than the source files. source path: 
$sourcePath" +
+s" / base archive path: $baseArchivePath")
+
+  /**
+   * FileStreamSource reads the files which one of below conditions is met:
+   * 1) file itself is matched with source path
+   * 2) parent directory is matched with source path
 
 Review comment:
   FYI, just filed https://issues.apache.org/jira/browse/SPARK-30281 and raised 
a patch with picking the option 2. #26845


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query

2020-01-01 Thread GitBox
HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] 
Provide option to clean up completed files in streaming query
URL: https://github.com/apache/spark/pull/22952#discussion_r358624511
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ##
 @@ -330,4 +341,96 @@ object FileStreamSource {
 
 def size: Int = map.size()
   }
+
+  private[sql] trait FileStreamSourceCleaner {
+def clean(entry: FileEntry): Unit
+  }
+
+  private[sql] object FileStreamSourceCleaner {
+def apply(
+fileSystem: FileSystem,
+sourcePath: Path,
+option: FileStreamOptions,
+hadoopConf: Configuration): Option[FileStreamSourceCleaner] = 
option.cleanSource match {
+  case CleanSourceMode.ARCHIVE =>
+require(option.sourceArchiveDir.isDefined)
+val path = new Path(option.sourceArchiveDir.get)
+val archiveFs = path.getFileSystem(hadoopConf)
+val qualifiedArchivePath = archiveFs.makeQualified(path)
+Some(new SourceFileArchiver(fileSystem, sourcePath, archiveFs, 
qualifiedArchivePath))
+
+  case CleanSourceMode.DELETE =>
+Some(new SourceFileRemover(fileSystem))
+
+  case _ => None
+}
+  }
+
+  private[sql] class SourceFileArchiver(
+  fileSystem: FileSystem,
+  sourcePath: Path,
+  baseArchiveFileSystem: FileSystem,
+  baseArchivePath: Path) extends FileStreamSourceCleaner with Logging {
+assertParameters()
+
+private def assertParameters(): Unit = {
+  require(fileSystem.getUri == baseArchiveFileSystem.getUri, "Base archive 
path is located " +
+s"on a different file system than the source files. source path: 
$sourcePath" +
+s" / base archive path: $baseArchivePath")
+
+  /**
+   * FileStreamSource reads the files which one of below conditions is met:
+   * 1) file itself is matched with source path
+   * 2) parent directory is matched with source path
 
 Review comment:
   FYI, just filed https://issues.apache.org/jira/browse/SPARK-30281 and raised 
a patch with picking the option 2. #26920


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query

2019-10-14 Thread GitBox
HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] 
Provide option to clean up completed files in streaming query
URL: https://github.com/apache/spark/pull/22952#discussion_r334714556
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ##
 @@ -258,16 +264,33 @@ class FileStreamSource(
* equal to `end` and will only request offsets greater than `end` in the 
future.
*/
   override def commit(end: Offset): Unit = {
-// No-op for now; FileStreamSource currently garbage-collects files based 
on timestamp
-// and the value of the maxFileAge parameter.
+val logOffset = FileStreamSourceOffset(end).logOffset
+
+if (sourceOptions.cleanSource != CleanSourceMode.NO_OP) {
 
 Review comment:
   That was suggested earlier and we agreed to deal with it from follow-up 
issue - the PR just has been sitting longer than expected. Could we deal with 
follow-up issue?
   
   Btw, if we don't guarantee the cleanup and do it with best effort (actually 
we already do it), it won't matter much to do it with background thread - 
except the problem you've mentioned. That could be remedied via retaining max 
amount of paths to cleanup (yes, this is based on "best effort").


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query

2019-10-14 Thread GitBox
HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] 
Provide option to clean up completed files in streaming query
URL: https://github.com/apache/spark/pull/22952#discussion_r334718819
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ##
 @@ -330,4 +353,129 @@ object FileStreamSource {
 
 def size: Int = map.size()
   }
+
+  private[sql] class FileStreamSourceCleaner(
+  fileSystem: FileSystem,
+  sourcePath: Path,
+  baseArchivePathString: Option[String]) extends Logging {
+
+private val sourceGlobFilters: Seq[GlobFilter] = 
buildSourceGlobFilters(sourcePath)
+
+private val baseArchivePath: Option[Path] = baseArchivePathString.map(new 
Path(_))
+
+def archive(entry: FileEntry): Unit = {
+  require(baseArchivePath.isDefined)
+
+  val curPath = new Path(new URI(entry.path))
+  val curPathUri = curPath.toUri
+
+  val newPath = buildArchiveFilePath(curPathUri)
+
+  if (isArchiveFileMatchedAgainstSourcePattern(newPath)) {
+logWarning(s"Fail to move $curPath to $newPath - destination matches " 
+
+  s"to source path/pattern. Skip moving file.")
+  } else {
+doArchive(curPath, newPath)
+  }
+}
+
+def delete(entry: FileEntry): Unit = {
+  val curPath = new Path(new URI(entry.path))
+  try {
+logDebug(s"Removing completed file $curPath")
+
+if (!fileSystem.delete(curPath, false)) {
+  logWarning(s"Fail to remove $curPath / skip removing file.")
+}
+  } catch {
+case NonFatal(e) =>
+  // Log to error but swallow exception to avoid process being stopped
+  logWarning(s"Fail to remove $curPath / skip removing file.", e)
+  }
+}
+
+private def buildSourceGlobFilters(sourcePath: Path): Seq[GlobFilter] = {
+  val filters = new scala.collection.mutable.MutableList[GlobFilter]()
+
+  var currentPath = sourcePath
+  while (!currentPath.isRoot) {
+filters += new GlobFilter(currentPath.getName)
+currentPath = currentPath.getParent
+  }
+
+  filters.toList
+}
+
+private def buildArchiveFilePath(pathUri: URI): Path = {
+  require(baseArchivePathString.isDefined)
+  val baseArchivePathStr = baseArchivePathString.get
+  val normalizedBaseArchiveDirPath = if (baseArchivePathStr.endsWith("/")) 
{
+baseArchivePathStr.substring(0, baseArchivePathStr.length - 1)
+  } else {
+baseArchivePathStr
+  }
+
+  new Path(normalizedBaseArchiveDirPath + pathUri.getPath)
+}
+
+private def isArchiveFileMatchedAgainstSourcePattern(archiveFile: Path): 
Boolean = {
 
 Review comment:
   > My guess is that it's checking whether the archive dir is under the source 
directory?
   
   It checks whether the destination of archive "file" will be under the source 
path (which contains glob) to prevent the possibility of overwriting/re-reading 
as input. 
   
   So actually the method name contains everything - if `SourcePattern` isn't a 
familiar representation, I'll change it to `SourcePath`. If we prefer to 
`MatchedWith` instead of `MatchedAgainst`, I'll change it. 
   
   If we agree the method name explains what it is doing, we might be able to 
skip adding doc - especially it's a private method.
   
   > This looks way too complicated for a simple check like that.
   
   The method becomes complicated because of two reasons:
   
   1) There's a tricky part in FileStreamSource: FileStreamSource doesn't only 
match the files which match the source path, but also matches the files which 
parent directory matches the source path. So we should consider both cases: 1) 
file itself is matched 2) parent directory is matched.
   
   Please refer below comments in test code:
   
   
https://github.com/apache/spark/blob/bd8da3799dd160771ebb3ea55b7678b644248425/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala#L736-L778
   
   2) Checking with glob pattern is costly, so we would like to avoid the case 
via leveraging known information if possible. For example, when file is moved 
to archive directory, destination path will retain input file's path as suffix, 
so destination path can't be matched with source path if archive directory's 
depth is longer than 2. (Neither file nor parent directory of destination path 
can be matched with source path.)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark

[GitHub] [spark] HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query

2019-10-14 Thread GitBox
HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] 
Provide option to clean up completed files in streaming query
URL: https://github.com/apache/spark/pull/22952#discussion_r334718819
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ##
 @@ -330,4 +353,129 @@ object FileStreamSource {
 
 def size: Int = map.size()
   }
+
+  private[sql] class FileStreamSourceCleaner(
+  fileSystem: FileSystem,
+  sourcePath: Path,
+  baseArchivePathString: Option[String]) extends Logging {
+
+private val sourceGlobFilters: Seq[GlobFilter] = 
buildSourceGlobFilters(sourcePath)
+
+private val baseArchivePath: Option[Path] = baseArchivePathString.map(new 
Path(_))
+
+def archive(entry: FileEntry): Unit = {
+  require(baseArchivePath.isDefined)
+
+  val curPath = new Path(new URI(entry.path))
+  val curPathUri = curPath.toUri
+
+  val newPath = buildArchiveFilePath(curPathUri)
+
+  if (isArchiveFileMatchedAgainstSourcePattern(newPath)) {
+logWarning(s"Fail to move $curPath to $newPath - destination matches " 
+
+  s"to source path/pattern. Skip moving file.")
+  } else {
+doArchive(curPath, newPath)
+  }
+}
+
+def delete(entry: FileEntry): Unit = {
+  val curPath = new Path(new URI(entry.path))
+  try {
+logDebug(s"Removing completed file $curPath")
+
+if (!fileSystem.delete(curPath, false)) {
+  logWarning(s"Fail to remove $curPath / skip removing file.")
+}
+  } catch {
+case NonFatal(e) =>
+  // Log to error but swallow exception to avoid process being stopped
+  logWarning(s"Fail to remove $curPath / skip removing file.", e)
+  }
+}
+
+private def buildSourceGlobFilters(sourcePath: Path): Seq[GlobFilter] = {
+  val filters = new scala.collection.mutable.MutableList[GlobFilter]()
+
+  var currentPath = sourcePath
+  while (!currentPath.isRoot) {
+filters += new GlobFilter(currentPath.getName)
+currentPath = currentPath.getParent
+  }
+
+  filters.toList
+}
+
+private def buildArchiveFilePath(pathUri: URI): Path = {
+  require(baseArchivePathString.isDefined)
+  val baseArchivePathStr = baseArchivePathString.get
+  val normalizedBaseArchiveDirPath = if (baseArchivePathStr.endsWith("/")) 
{
+baseArchivePathStr.substring(0, baseArchivePathStr.length - 1)
+  } else {
+baseArchivePathStr
+  }
+
+  new Path(normalizedBaseArchiveDirPath + pathUri.getPath)
+}
+
+private def isArchiveFileMatchedAgainstSourcePattern(archiveFile: Path): 
Boolean = {
 
 Review comment:
   > My guess is that it's checking whether the archive dir is under the source 
directory?
   
   It checks whether the destination of archive "file" will be under the source 
path (which contains glob) to prevent the possibility of overwriting/re-reading 
as input. 
   
   So actually the method name contains everything - if `SourcePattern` isn't a 
familiar representation, I'll change it to `SourcePath`. If we prefer to 
`MatchedWith` instead of `MatchedAgainst`, I'll change it. 
   
   If we agree the method name explains what it is doing, we might be able to 
skip adding doc - especially it's a private method. It may be the thing where 
is the best place to explain the details - I've added comments near code lines, 
but I can adjust it to scaladoc if we prefer it.
   
   > This looks way too complicated for a simple check like that.
   
   The method becomes complicated because of two reasons:
   
   1) There's a tricky part in FileStreamSource: FileStreamSource doesn't only 
match the files which match the source path, but also matches the files which 
parent directory matches the source path. So we should consider both cases: 1) 
file itself is matched 2) parent directory is matched.
   
   Please refer below comments in test code:
   
   
https://github.com/apache/spark/blob/bd8da3799dd160771ebb3ea55b7678b644248425/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala#L736-L778
   
   2) Checking with glob pattern is costly, so we would like to avoid the case 
via leveraging known information if possible. For example, when file is moved 
to archive directory, destination path will retain input file's path as suffix, 
so destination path can't be matched with source path if archive directory's 
depth is longer than 2. (Neither file nor parent directory of destination path 
can be matched with source path.)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-

[GitHub] [spark] HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query

2019-10-14 Thread GitBox
HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] 
Provide option to clean up completed files in streaming query
URL: https://github.com/apache/spark/pull/22952#discussion_r334720763
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ##
 @@ -330,4 +353,129 @@ object FileStreamSource {
 
 def size: Int = map.size()
   }
+
+  private[sql] class FileStreamSourceCleaner(
+  fileSystem: FileSystem,
+  sourcePath: Path,
+  baseArchivePathString: Option[String]) extends Logging {
+
+private val sourceGlobFilters: Seq[GlobFilter] = 
buildSourceGlobFilters(sourcePath)
+
+private val baseArchivePath: Option[Path] = baseArchivePathString.map(new 
Path(_))
+
+def archive(entry: FileEntry): Unit = {
+  require(baseArchivePath.isDefined)
+
+  val curPath = new Path(new URI(entry.path))
+  val curPathUri = curPath.toUri
+
+  val newPath = buildArchiveFilePath(curPathUri)
+
+  if (isArchiveFileMatchedAgainstSourcePattern(newPath)) {
+logWarning(s"Fail to move $curPath to $newPath - destination matches " 
+
+  s"to source path/pattern. Skip moving file.")
+  } else {
+doArchive(curPath, newPath)
+  }
+}
+
+def delete(entry: FileEntry): Unit = {
+  val curPath = new Path(new URI(entry.path))
+  try {
+logDebug(s"Removing completed file $curPath")
+
+if (!fileSystem.delete(curPath, false)) {
+  logWarning(s"Fail to remove $curPath / skip removing file.")
+}
+  } catch {
+case NonFatal(e) =>
+  // Log to error but swallow exception to avoid process being stopped
+  logWarning(s"Fail to remove $curPath / skip removing file.", e)
+  }
+}
+
+private def buildSourceGlobFilters(sourcePath: Path): Seq[GlobFilter] = {
+  val filters = new scala.collection.mutable.MutableList[GlobFilter]()
+
+  var currentPath = sourcePath
+  while (!currentPath.isRoot) {
+filters += new GlobFilter(currentPath.getName)
+currentPath = currentPath.getParent
+  }
+
+  filters.toList
+}
+
+private def buildArchiveFilePath(pathUri: URI): Path = {
+  require(baseArchivePathString.isDefined)
+  val baseArchivePathStr = baseArchivePathString.get
+  val normalizedBaseArchiveDirPath = if (baseArchivePathStr.endsWith("/")) 
{
+baseArchivePathStr.substring(0, baseArchivePathStr.length - 1)
+  } else {
+baseArchivePathStr
+  }
+
+  new Path(normalizedBaseArchiveDirPath + pathUri.getPath)
 
 Review comment:
   > Is pathUri guaranteed to contain an absolute path?
   
   Yes, FileStreamSource qualifies the path when listing files.
   
   > Because then this is basically new Path(parentPath, 
pathUri.toString.substring(1)), which also may mean you can clean up a bunch of 
this code.
   
   `.getPath` is used because URI could have schema part and then 
`pathUri.toString.substring(1)` wouldn't work as expected.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query

2019-10-15 Thread GitBox
HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] 
Provide option to clean up completed files in streaming query
URL: https://github.com/apache/spark/pull/22952#discussion_r335217980
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ##
 @@ -330,4 +353,129 @@ object FileStreamSource {
 
 def size: Int = map.size()
   }
+
+  private[sql] class FileStreamSourceCleaner(
+  fileSystem: FileSystem,
+  sourcePath: Path,
+  baseArchivePathString: Option[String]) extends Logging {
+
+private val sourceGlobFilters: Seq[GlobFilter] = 
buildSourceGlobFilters(sourcePath)
+
+private val baseArchivePath: Option[Path] = baseArchivePathString.map(new 
Path(_))
+
+def archive(entry: FileEntry): Unit = {
+  require(baseArchivePath.isDefined)
+
+  val curPath = new Path(new URI(entry.path))
+  val curPathUri = curPath.toUri
+
+  val newPath = buildArchiveFilePath(curPathUri)
+
+  if (isArchiveFileMatchedAgainstSourcePattern(newPath)) {
+logWarning(s"Fail to move $curPath to $newPath - destination matches " 
+
+  s"to source path/pattern. Skip moving file.")
+  } else {
+doArchive(curPath, newPath)
+  }
+}
+
+def delete(entry: FileEntry): Unit = {
+  val curPath = new Path(new URI(entry.path))
+  try {
+logDebug(s"Removing completed file $curPath")
+
+if (!fileSystem.delete(curPath, false)) {
+  logWarning(s"Fail to remove $curPath / skip removing file.")
+}
+  } catch {
+case NonFatal(e) =>
+  // Log to error but swallow exception to avoid process being stopped
+  logWarning(s"Fail to remove $curPath / skip removing file.", e)
+  }
+}
+
+private def buildSourceGlobFilters(sourcePath: Path): Seq[GlobFilter] = {
+  val filters = new scala.collection.mutable.MutableList[GlobFilter]()
+
+  var currentPath = sourcePath
+  while (!currentPath.isRoot) {
+filters += new GlobFilter(currentPath.getName)
+currentPath = currentPath.getParent
+  }
+
+  filters.toList
+}
+
+private def buildArchiveFilePath(pathUri: URI): Path = {
+  require(baseArchivePathString.isDefined)
+  val baseArchivePathStr = baseArchivePathString.get
+  val normalizedBaseArchiveDirPath = if (baseArchivePathStr.endsWith("/")) 
{
+baseArchivePathStr.substring(0, baseArchivePathStr.length - 1)
+  } else {
+baseArchivePathStr
+  }
+
+  new Path(normalizedBaseArchiveDirPath + pathUri.getPath)
+}
+
+private def isArchiveFileMatchedAgainstSourcePattern(archiveFile: Path): 
Boolean = {
 
 Review comment:
   I didn't mean it shouldn't be explained in a scaladoc in method - I meant 
the method has comments in near code lines to explain. Sounds like the 
preference is adding scaladoc in this case; I'll leave a comment.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query

2019-10-15 Thread GitBox
HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] 
Provide option to clean up completed files in streaming query
URL: https://github.com/apache/spark/pull/22952#discussion_r335220635
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ##
 @@ -330,4 +353,129 @@ object FileStreamSource {
 
 def size: Int = map.size()
   }
+
+  private[sql] class FileStreamSourceCleaner(
+  fileSystem: FileSystem,
+  sourcePath: Path,
+  baseArchivePathString: Option[String]) extends Logging {
+
+private val sourceGlobFilters: Seq[GlobFilter] = 
buildSourceGlobFilters(sourcePath)
+
+private val baseArchivePath: Option[Path] = baseArchivePathString.map(new 
Path(_))
+
+def archive(entry: FileEntry): Unit = {
+  require(baseArchivePath.isDefined)
+
+  val curPath = new Path(new URI(entry.path))
+  val curPathUri = curPath.toUri
+
+  val newPath = buildArchiveFilePath(curPathUri)
+
+  if (isArchiveFileMatchedAgainstSourcePattern(newPath)) {
+logWarning(s"Fail to move $curPath to $newPath - destination matches " 
+
+  s"to source path/pattern. Skip moving file.")
+  } else {
+doArchive(curPath, newPath)
+  }
+}
+
+def delete(entry: FileEntry): Unit = {
+  val curPath = new Path(new URI(entry.path))
+  try {
+logDebug(s"Removing completed file $curPath")
+
+if (!fileSystem.delete(curPath, false)) {
+  logWarning(s"Fail to remove $curPath / skip removing file.")
+}
+  } catch {
+case NonFatal(e) =>
+  // Log to error but swallow exception to avoid process being stopped
+  logWarning(s"Fail to remove $curPath / skip removing file.", e)
+  }
+}
+
+private def buildSourceGlobFilters(sourcePath: Path): Seq[GlobFilter] = {
+  val filters = new scala.collection.mutable.MutableList[GlobFilter]()
+
+  var currentPath = sourcePath
+  while (!currentPath.isRoot) {
+filters += new GlobFilter(currentPath.getName)
+currentPath = currentPath.getParent
+  }
+
+  filters.toList
+}
+
+private def buildArchiveFilePath(pathUri: URI): Path = {
+  require(baseArchivePathString.isDefined)
+  val baseArchivePathStr = baseArchivePathString.get
+  val normalizedBaseArchiveDirPath = if (baseArchivePathStr.endsWith("/")) 
{
+baseArchivePathStr.substring(0, baseArchivePathStr.length - 1)
+  } else {
+baseArchivePathStr
+  }
+
+  new Path(normalizedBaseArchiveDirPath + pathUri.getPath)
 
 Review comment:
   Ah I see. I got the point - making child to be relative (by removing leading 
'/') and let constructor resolves it. Didn't indicate the trick works. Nice 
suggestion.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query

2019-10-23 Thread GitBox
HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] 
Provide option to clean up completed files in streaming query
URL: https://github.com/apache/spark/pull/22952#discussion_r338334551
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ##
 @@ -330,4 +353,137 @@ object FileStreamSource {
 
 def size: Int = map.size()
   }
+
+  private[sql] class FileStreamSourceCleaner(
+  fileSystem: FileSystem,
+  sourcePath: Path,
+  baseArchivePathString: Option[String]) extends Logging {
+
+private val sourceGlobFilters: Seq[GlobFilter] = 
buildSourceGlobFilters(sourcePath)
+
+private val baseArchivePath: Option[Path] = baseArchivePathString.map(new 
Path(_))
+
+def archive(entry: FileEntry): Unit = {
+  require(baseArchivePath.isDefined)
+
+  val curPath = new Path(new URI(entry.path))
+  val curPathUri = curPath.toUri
+
+  val newPath = buildArchiveFilePath(curPathUri)
+
+  if (isArchiveFileMatchedAgainstSourcePattern(newPath)) {
+logWarning(s"Fail to move $curPath to $newPath - destination matches " 
+
+  s"to source path/pattern. Skip moving file.")
+  } else {
+doArchive(curPath, newPath)
+  }
+}
+
+def delete(entry: FileEntry): Unit = {
+  val curPath = new Path(new URI(entry.path))
+  try {
+logDebug(s"Removing completed file $curPath")
+
+if (!fileSystem.delete(curPath, false)) {
+  logWarning(s"Fail to remove $curPath / skip removing file.")
+}
+  } catch {
+case NonFatal(e) =>
+  // Log to error but swallow exception to avoid process being stopped
+  logWarning(s"Fail to remove $curPath / skip removing file.", e)
+  }
+}
+
+private def buildSourceGlobFilters(sourcePath: Path): Seq[GlobFilter] = {
+  val filters = new scala.collection.mutable.MutableList[GlobFilter]()
+
+  var currentPath = sourcePath
+  while (!currentPath.isRoot) {
+filters += new GlobFilter(currentPath.getName)
+currentPath = currentPath.getParent
+  }
+
+  filters.toList
+}
+
+private def buildArchiveFilePath(pathUri: URI): Path = {
+  require(baseArchivePath.isDefined)
+  new Path(baseArchivePath.get, pathUri.getPath.stripPrefix("/"))
+}
+
+/**
+ * This method checks whether the destination of archive file will be 
under the source path
+ * (which contains glob) to prevent the possibility of 
overwriting/re-reading as input.
+ *
+ * FileStreamSource reads the files which one of below conditions is met:
+ * 1) file itself is matched with source path
+ * 2) parent directory is matched with source path
+ *
+ * Checking with glob pattern is costly, so this method leverages above 
information to prune
+ * the cases where the file cannot be matched with source path. For 
example, when file is
+ * moved to archive directory, destination path will retain input file's 
path as suffix,
+ * so destination path can't be matched with source path if archive 
directory's depth is
+ * longer than 2, as neither file nor parent directory of destination path 
can be matched
+ * with source path.
+ */
+private def isArchiveFileMatchedAgainstSourcePattern(archiveFile: Path): 
Boolean = {
+  if (baseArchivePath.get.depth() > 2) {
 
 Review comment:
   This optimization deals with simple fact: "glob path would only cover files 
in specific depth" (there's no notation like `**` which would match multiple 
depths of directories), so the depth of possibly matched paths is deterministic 
if you know about glob path.
   
   Given we move source file on the archive path which would be a directory, 
depth will be increased by 1. And FileStreamSource allows the "parent 
directory" of source file to be matched with the glob path, the possible 
matching depths would be "the depth of glob path" + 1 and "the depth of glob 
path" + 2. Does it make sense?
   
   > is there a reason why you need to check this for every archived path? 
Can't you, during config validation, perform this check once, with some path 
generated to match the source pattern, and then declare that there is no 
conflict between the archive path and the source path?
   
   No, that's what I've been missing. You're right it can be determined once we 
initialize. Nice finding.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query

2019-10-23 Thread GitBox
HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] 
Provide option to clean up completed files in streaming query
URL: https://github.com/apache/spark/pull/22952#discussion_r338336181
 
 

 ##
 File path: docs/structured-streaming-programming-guide.md
 ##
 @@ -546,6 +546,13 @@ Here are the details of all the sources in Spark.
 "s3://a/dataset.txt"
 "s3n://a/b/dataset.txt"
 "s3a://a/b/c/dataset.txt"
+cleanSource: option to clean up completed files after 
processing.
+Available options are "archive", "delete", "off". If the option is not 
provided, the default value is "off".
+When "archive" is provided, additional option 
sourceArchiveDir must be provided as well. The value of 
"sourceArchiveDir" must be outside of source path, to ensure archived files are 
never included to new source files again.
+Spark will move source files respecting its own path. For example, if 
the path of source file is "/a/b/dataset.txt" and the path of archive directory 
is "/archived/here", file will be moved to "/archived/here/a/b/dataset.txt"
+NOTE: Both archiving (via moving) or deleting completed files would 
introduce overhead (slow down) in each micro-batch, so you need to understand 
the cost for each operation in your file system before enabling this option. On 
the other hand, enabling this option will reduce the cost to list source files 
which is considered as a heavy operation.
+NOTE 2: The source path should not be used from multiple sources or 
queries when enabling this option, because source files will be moved or 
deleted which behavior may impact the other sources and queries.
 
 Review comment:
   This is a reflection of previous review comment 
https://github.com/apache/spark/pull/22952#discussion_r237314459
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query

2019-10-23 Thread GitBox
HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] 
Provide option to clean up completed files in streaming query
URL: https://github.com/apache/spark/pull/22952#discussion_r338338952
 
 

 ##
 File path: docs/structured-streaming-programming-guide.md
 ##
 @@ -546,6 +546,13 @@ Here are the details of all the sources in Spark.
 "s3://a/dataset.txt"
 "s3n://a/b/dataset.txt"
 "s3a://a/b/c/dataset.txt"
+cleanSource: option to clean up completed files after 
processing.
+Available options are "archive", "delete", "off". If the option is not 
provided, the default value is "off".
+When "archive" is provided, additional option 
sourceArchiveDir must be provided as well. The value of 
"sourceArchiveDir" must be outside of source path, to ensure archived files are 
never included to new source files again.
+Spark will move source files respecting its own path. For example, if 
the path of source file is "/a/b/dataset.txt" and the path of archive directory 
is "/archived/here", file will be moved to "/archived/here/a/b/dataset.txt"
+NOTE: Both archiving (via moving) or deleting completed files would 
introduce overhead (slow down) in each micro-batch, so you need to understand 
the cost for each operation in your file system before enabling this option. On 
the other hand, enabling this option will reduce the cost to list source files 
which is considered as a heavy operation.
+NOTE 2: The source path should not be used from multiple sources or 
queries when enabling this option, because source files will be moved or 
deleted which behavior may impact the other sources and queries.
 
 Review comment:
   Revisiting the comment throughly and yes you're right that was the 
intention. OK to remove latter part.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query

2019-10-23 Thread GitBox
HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] 
Provide option to clean up completed files in streaming query
URL: https://github.com/apache/spark/pull/22952#discussion_r338347720
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ##
 @@ -330,4 +353,137 @@ object FileStreamSource {
 
 def size: Int = map.size()
   }
+
+  private[sql] class FileStreamSourceCleaner(
+  fileSystem: FileSystem,
+  sourcePath: Path,
+  baseArchivePathString: Option[String]) extends Logging {
+
+private val sourceGlobFilters: Seq[GlobFilter] = 
buildSourceGlobFilters(sourcePath)
+
+private val baseArchivePath: Option[Path] = baseArchivePathString.map(new 
Path(_))
+
+def archive(entry: FileEntry): Unit = {
+  require(baseArchivePath.isDefined)
+
+  val curPath = new Path(new URI(entry.path))
+  val curPathUri = curPath.toUri
+
+  val newPath = buildArchiveFilePath(curPathUri)
+
+  if (isArchiveFileMatchedAgainstSourcePattern(newPath)) {
+logWarning(s"Fail to move $curPath to $newPath - destination matches " 
+
+  s"to source path/pattern. Skip moving file.")
+  } else {
+doArchive(curPath, newPath)
+  }
+}
+
+def delete(entry: FileEntry): Unit = {
+  val curPath = new Path(new URI(entry.path))
+  try {
+logDebug(s"Removing completed file $curPath")
+
+if (!fileSystem.delete(curPath, false)) {
+  logWarning(s"Fail to remove $curPath / skip removing file.")
+}
+  } catch {
+case NonFatal(e) =>
+  // Log to error but swallow exception to avoid process being stopped
+  logWarning(s"Fail to remove $curPath / skip removing file.", e)
+  }
+}
+
+private def buildSourceGlobFilters(sourcePath: Path): Seq[GlobFilter] = {
+  val filters = new scala.collection.mutable.MutableList[GlobFilter]()
+
+  var currentPath = sourcePath
+  while (!currentPath.isRoot) {
+filters += new GlobFilter(currentPath.getName)
+currentPath = currentPath.getParent
+  }
+
+  filters.toList
+}
+
+private def buildArchiveFilePath(pathUri: URI): Path = {
+  require(baseArchivePath.isDefined)
+  new Path(baseArchivePath.get, pathUri.getPath.stripPrefix("/"))
+}
+
+/**
+ * This method checks whether the destination of archive file will be 
under the source path
+ * (which contains glob) to prevent the possibility of 
overwriting/re-reading as input.
+ *
+ * FileStreamSource reads the files which one of below conditions is met:
+ * 1) file itself is matched with source path
+ * 2) parent directory is matched with source path
+ *
+ * Checking with glob pattern is costly, so this method leverages above 
information to prune
+ * the cases where the file cannot be matched with source path. For 
example, when file is
+ * moved to archive directory, destination path will retain input file's 
path as suffix,
+ * so destination path can't be matched with source path if archive 
directory's depth is
+ * longer than 2, as neither file nor parent directory of destination path 
can be matched
+ * with source path.
+ */
+private def isArchiveFileMatchedAgainstSourcePattern(archiveFile: Path): 
Boolean = {
+  if (baseArchivePath.get.depth() > 2) {
+// there's no chance for archive file to be matched against source 
pattern
+return false
+  }
+
+  var matched = true
+
+  // new path will never match against source path when the depth is not a 
range of
+  // the depth of source path ~ (the depth of source path + 1)
+  // because the source files are picked when they match against source 
pattern or
+  // their parent directories match against source pattern
+  val depthSourcePattern = sourceGlobFilters.length
+  val depthArchiveFile = archiveFile.depth()
+
+  // we already checked against the depth of archive path, but rechecking 
wouldn't hurt
+  if (depthArchiveFile < depthSourcePattern || depthArchiveFile > 
depthSourcePattern + 1) {
+// never matched
+matched = false
+  } else {
+var pathToCompare = if (depthArchiveFile == depthSourcePattern + 1) {
+  archiveFile.getParent
+} else {
+  archiveFile
+}
+
+// Now pathToCompare should have same depth as sourceGlobFilters.length
+var index = 0
+do {
+  // GlobFilter only matches against its name, not full path so it's 
safe to compare
+  if (!sourceGlobFilters(index).accept(pathToCompare)) {
+matched = false
+  } else {
+pathToCompare = pathToCompare.getParent
+index += 1
+  }
+} while (matched && !pathToCompare.isRoot)
+  }
+
+  matched
+}
+
+private def doArchive(sourcePath: Path, archivePath: Path): Unit = {
+ 

[GitHub] [spark] HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query

2019-10-23 Thread GitBox
HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] 
Provide option to clean up completed files in streaming query
URL: https://github.com/apache/spark/pull/22952#discussion_r338334551
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ##
 @@ -330,4 +353,137 @@ object FileStreamSource {
 
 def size: Int = map.size()
   }
+
+  private[sql] class FileStreamSourceCleaner(
+  fileSystem: FileSystem,
+  sourcePath: Path,
+  baseArchivePathString: Option[String]) extends Logging {
+
+private val sourceGlobFilters: Seq[GlobFilter] = 
buildSourceGlobFilters(sourcePath)
+
+private val baseArchivePath: Option[Path] = baseArchivePathString.map(new 
Path(_))
+
+def archive(entry: FileEntry): Unit = {
+  require(baseArchivePath.isDefined)
+
+  val curPath = new Path(new URI(entry.path))
+  val curPathUri = curPath.toUri
+
+  val newPath = buildArchiveFilePath(curPathUri)
+
+  if (isArchiveFileMatchedAgainstSourcePattern(newPath)) {
+logWarning(s"Fail to move $curPath to $newPath - destination matches " 
+
+  s"to source path/pattern. Skip moving file.")
+  } else {
+doArchive(curPath, newPath)
+  }
+}
+
+def delete(entry: FileEntry): Unit = {
+  val curPath = new Path(new URI(entry.path))
+  try {
+logDebug(s"Removing completed file $curPath")
+
+if (!fileSystem.delete(curPath, false)) {
+  logWarning(s"Fail to remove $curPath / skip removing file.")
+}
+  } catch {
+case NonFatal(e) =>
+  // Log to error but swallow exception to avoid process being stopped
+  logWarning(s"Fail to remove $curPath / skip removing file.", e)
+  }
+}
+
+private def buildSourceGlobFilters(sourcePath: Path): Seq[GlobFilter] = {
+  val filters = new scala.collection.mutable.MutableList[GlobFilter]()
+
+  var currentPath = sourcePath
+  while (!currentPath.isRoot) {
+filters += new GlobFilter(currentPath.getName)
+currentPath = currentPath.getParent
+  }
+
+  filters.toList
+}
+
+private def buildArchiveFilePath(pathUri: URI): Path = {
+  require(baseArchivePath.isDefined)
+  new Path(baseArchivePath.get, pathUri.getPath.stripPrefix("/"))
+}
+
+/**
+ * This method checks whether the destination of archive file will be 
under the source path
+ * (which contains glob) to prevent the possibility of 
overwriting/re-reading as input.
+ *
+ * FileStreamSource reads the files which one of below conditions is met:
+ * 1) file itself is matched with source path
+ * 2) parent directory is matched with source path
+ *
+ * Checking with glob pattern is costly, so this method leverages above 
information to prune
+ * the cases where the file cannot be matched with source path. For 
example, when file is
+ * moved to archive directory, destination path will retain input file's 
path as suffix,
+ * so destination path can't be matched with source path if archive 
directory's depth is
+ * longer than 2, as neither file nor parent directory of destination path 
can be matched
+ * with source path.
+ */
+private def isArchiveFileMatchedAgainstSourcePattern(archiveFile: Path): 
Boolean = {
+  if (baseArchivePath.get.depth() > 2) {
 
 Review comment:
   This optimization deals with simple fact: "glob path would only cover files 
in specific depth" (there's no notation like `**` which would match multiple 
depths of directories), so the depth of possibly matched paths is deterministic 
if you know about glob path.
   
   Suppose you have combined the path, all possible cases of archive path to be 
matched with glob path are 1) direct match: only possible if base archive path 
is just '/' which is depth 1 2) parent dir match: direct subdirectory under /  
(like `/a`) which the depth of base archive path would be 2. Does it make sense?
   
   > is there a reason why you need to check this for every archived path? 
Can't you, during config validation, perform this check once, with some path 
generated to match the source pattern, and then declare that there is no 
conflict between the archive path and the source path?
   
   No, that's what I've been missing. You're right it can be determined once we 
initialize. Nice finding.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query

2019-10-24 Thread GitBox
HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] 
Provide option to clean up completed files in streaming query
URL: https://github.com/apache/spark/pull/22952#discussion_r338835194
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ##
 @@ -330,4 +362,139 @@ object FileStreamSource {
 
 def size: Int = map.size()
   }
+
+  private[sql] class FileStreamSourceCleaner(
+  fileSystem: FileSystem,
+  sourcePath: Path,
+  baseArchiveFileSystem: Option[FileSystem],
+  baseArchivePath: Option[Path]) extends Logging {
+require(baseArchiveFileSystem.isDefined == baseArchivePath.isDefined)
+
+private val sourceGlobFilters: Seq[GlobFilter] = 
buildSourceGlobFilters(sourcePath)
+
+private val sameFsSourceAndArchive: Boolean = {
 
 Review comment:
   Oh OK. Sounds OK to fail the query and it might be even better than 
realizing later the configuration doesn't work since it was misconfigured or 
not supported. I'll make the change.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query

2019-10-24 Thread GitBox
HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] 
Provide option to clean up completed files in streaming query
URL: https://github.com/apache/spark/pull/22952#discussion_r338837229
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ##
 @@ -330,4 +362,139 @@ object FileStreamSource {
 
 def size: Int = map.size()
   }
+
+  private[sql] class FileStreamSourceCleaner(
+  fileSystem: FileSystem,
+  sourcePath: Path,
+  baseArchiveFileSystem: Option[FileSystem],
+  baseArchivePath: Option[Path]) extends Logging {
+require(baseArchiveFileSystem.isDefined == baseArchivePath.isDefined)
+
+private val sourceGlobFilters: Seq[GlobFilter] = 
buildSourceGlobFilters(sourcePath)
+
+private val sameFsSourceAndArchive: Boolean = {
+  baseArchiveFileSystem.exists { fs =>
+if (fileSystem.getUri != fs.getUri) {
+  logWarning("Base archive path is located to the different filesystem 
with source, " +
+s"which is not supported. source path: ${sourcePath} / base 
archive path: " +
+s"${baseArchivePath.get}")
+  false
+} else {
+  true
+}
+  }
+}
+
+/**
+ * This is a flag to skip matching archived path with source path.
+ *
+ * FileStreamSource reads the files which one of below conditions is met:
+ * 1) file itself is matched with source path
+ * 2) parent directory is matched with source path
+ *
+ * Checking with glob pattern is costly, so this flag leverages above 
information to prune
+ * the cases where the file cannot be matched with source path. For 
example, when file is
+ * moved to archive directory, destination path will retain input file's 
path as suffix,
+ * so destination path can't be matched with source path if archive 
directory's depth is
+ * longer than 2, as neither file nor parent directory of destination path 
can be matched
+ * with source path.
+ */
+private val skipCheckingGlob: Boolean = baseArchivePath.exists(_.depth() > 
2)
+
+def archive(entry: FileEntry): Unit = {
+  require(baseArchivePath.isDefined)
+
+  if (sameFsSourceAndArchive) {
+val curPath = new Path(new URI(entry.path))
+val newPath = new Path(baseArchivePath.get, 
curPath.toUri.getPath.stripPrefix("/"))
+
+if (!skipCheckingGlob && pathMatchesSourcePattern(newPath)) {
 
 Review comment:
   Not sure I understand. Could you elaborate? I agreed with your suggestion as 
it shouldn't be needed to be calculated per path and don't get anything except 
that.
   
   If you meant about moving skipCheckingGlob into config validation, I'm not 
sure I like it, as it's just the optimization on FileStreamSourceCleaner, 
implementation details, so no need to expose it to outside.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query

2019-10-24 Thread GitBox
HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] 
Provide option to clean up completed files in streaming query
URL: https://github.com/apache/spark/pull/22952#discussion_r338837940
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ##
 @@ -330,4 +362,139 @@ object FileStreamSource {
 
 def size: Int = map.size()
   }
+
+  private[sql] class FileStreamSourceCleaner(
+  fileSystem: FileSystem,
+  sourcePath: Path,
+  baseArchiveFileSystem: Option[FileSystem],
+  baseArchivePath: Option[Path]) extends Logging {
+require(baseArchiveFileSystem.isDefined == baseArchivePath.isDefined)
+
+private val sourceGlobFilters: Seq[GlobFilter] = 
buildSourceGlobFilters(sourcePath)
+
+private val sameFsSourceAndArchive: Boolean = {
+  baseArchiveFileSystem.exists { fs =>
+if (fileSystem.getUri != fs.getUri) {
+  logWarning("Base archive path is located to the different filesystem 
with source, " +
+s"which is not supported. source path: ${sourcePath} / base 
archive path: " +
+s"${baseArchivePath.get}")
+  false
+} else {
+  true
+}
+  }
+}
+
+/**
+ * This is a flag to skip matching archived path with source path.
+ *
+ * FileStreamSource reads the files which one of below conditions is met:
+ * 1) file itself is matched with source path
+ * 2) parent directory is matched with source path
+ *
+ * Checking with glob pattern is costly, so this flag leverages above 
information to prune
+ * the cases where the file cannot be matched with source path. For 
example, when file is
+ * moved to archive directory, destination path will retain input file's 
path as suffix,
+ * so destination path can't be matched with source path if archive 
directory's depth is
+ * longer than 2, as neither file nor parent directory of destination path 
can be matched
+ * with source path.
+ */
+private val skipCheckingGlob: Boolean = baseArchivePath.exists(_.depth() > 
2)
+
+def archive(entry: FileEntry): Unit = {
+  require(baseArchivePath.isDefined)
+
+  if (sameFsSourceAndArchive) {
+val curPath = new Path(new URI(entry.path))
+val newPath = new Path(baseArchivePath.get, 
curPath.toUri.getPath.stripPrefix("/"))
+
+if (!skipCheckingGlob && pathMatchesSourcePattern(newPath)) {
 
 Review comment:
   OK looking the comment again I only understood small amount of your comment. 
Sorry about this. The source path contains wildcard in any subpath so only 
depth check can be optimized. Please correct me if I'm missing here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query

2019-10-24 Thread GitBox
HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] 
Provide option to clean up completed files in streaming query
URL: https://github.com/apache/spark/pull/22952#discussion_r338837940
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ##
 @@ -330,4 +362,139 @@ object FileStreamSource {
 
 def size: Int = map.size()
   }
+
+  private[sql] class FileStreamSourceCleaner(
+  fileSystem: FileSystem,
+  sourcePath: Path,
+  baseArchiveFileSystem: Option[FileSystem],
+  baseArchivePath: Option[Path]) extends Logging {
+require(baseArchiveFileSystem.isDefined == baseArchivePath.isDefined)
+
+private val sourceGlobFilters: Seq[GlobFilter] = 
buildSourceGlobFilters(sourcePath)
+
+private val sameFsSourceAndArchive: Boolean = {
+  baseArchiveFileSystem.exists { fs =>
+if (fileSystem.getUri != fs.getUri) {
+  logWarning("Base archive path is located to the different filesystem 
with source, " +
+s"which is not supported. source path: ${sourcePath} / base 
archive path: " +
+s"${baseArchivePath.get}")
+  false
+} else {
+  true
+}
+  }
+}
+
+/**
+ * This is a flag to skip matching archived path with source path.
+ *
+ * FileStreamSource reads the files which one of below conditions is met:
+ * 1) file itself is matched with source path
+ * 2) parent directory is matched with source path
+ *
+ * Checking with glob pattern is costly, so this flag leverages above 
information to prune
+ * the cases where the file cannot be matched with source path. For 
example, when file is
+ * moved to archive directory, destination path will retain input file's 
path as suffix,
+ * so destination path can't be matched with source path if archive 
directory's depth is
+ * longer than 2, as neither file nor parent directory of destination path 
can be matched
+ * with source path.
+ */
+private val skipCheckingGlob: Boolean = baseArchivePath.exists(_.depth() > 
2)
+
+def archive(entry: FileEntry): Unit = {
+  require(baseArchivePath.isDefined)
+
+  if (sameFsSourceAndArchive) {
+val curPath = new Path(new URI(entry.path))
+val newPath = new Path(baseArchivePath.get, 
curPath.toUri.getPath.stripPrefix("/"))
+
+if (!skipCheckingGlob && pathMatchesSourcePattern(newPath)) {
 
 Review comment:
   OK looking the comment again I only understood small amount of your comment. 
Sorry about this. ~~The source path contains wildcard in any subpath so only 
depth check can be optimized. Please correct me if I'm missing here.~~ 
   
   Sorry never mind. I just got your point about further pruning the case. If 
baseArchivePath doesn't match with source path, no need to check further. Will 
add.
   
   Btw, I guess we still need this if baseArchivePath matches with source path, 
but please let me know if you're seeing the approach of verification we can 
eliminate this entirely.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query

2019-10-24 Thread GitBox
HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] 
Provide option to clean up completed files in streaming query
URL: https://github.com/apache/spark/pull/22952#discussion_r338847897
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ##
 @@ -330,4 +362,139 @@ object FileStreamSource {
 
 def size: Int = map.size()
   }
+
+  private[sql] class FileStreamSourceCleaner(
+  fileSystem: FileSystem,
+  sourcePath: Path,
+  baseArchiveFileSystem: Option[FileSystem],
+  baseArchivePath: Option[Path]) extends Logging {
+require(baseArchiveFileSystem.isDefined == baseArchivePath.isDefined)
+
+private val sourceGlobFilters: Seq[GlobFilter] = 
buildSourceGlobFilters(sourcePath)
+
+private val sameFsSourceAndArchive: Boolean = {
+  baseArchiveFileSystem.exists { fs =>
+if (fileSystem.getUri != fs.getUri) {
+  logWarning("Base archive path is located to the different filesystem 
with source, " +
+s"which is not supported. source path: ${sourcePath} / base 
archive path: " +
+s"${baseArchivePath.get}")
+  false
+} else {
+  true
+}
+  }
+}
+
+/**
+ * This is a flag to skip matching archived path with source path.
+ *
+ * FileStreamSource reads the files which one of below conditions is met:
+ * 1) file itself is matched with source path
+ * 2) parent directory is matched with source path
+ *
+ * Checking with glob pattern is costly, so this flag leverages above 
information to prune
+ * the cases where the file cannot be matched with source path. For 
example, when file is
+ * moved to archive directory, destination path will retain input file's 
path as suffix,
+ * so destination path can't be matched with source path if archive 
directory's depth is
+ * longer than 2, as neither file nor parent directory of destination path 
can be matched
+ * with source path.
+ */
+private val skipCheckingGlob: Boolean = baseArchivePath.exists(_.depth() > 
2)
+
+def archive(entry: FileEntry): Unit = {
+  require(baseArchivePath.isDefined)
+
+  if (sameFsSourceAndArchive) {
+val curPath = new Path(new URI(entry.path))
+val newPath = new Path(baseArchivePath.get, 
curPath.toUri.getPath.stripPrefix("/"))
+
+if (!skipCheckingGlob && pathMatchesSourcePattern(newPath)) {
 
 Review comment:
   > I don't fully understand the matching you explained, but my guess is that 
you could end up with an archived file called "/hello/hello1/hell" and that 
somehow matches the pattern?
   
   Exactly. We cannot forecast if first depth of source path has wildcard. Yes 
we can also prune the case if the first depth of source path doesn't contain 
wildcard, but feels like more and more complicated.
   
   As we would really want to simplify the case, how about just forcing 
baseArchivePath to have depth more than 2? (I feel it would be harmless in 
production - if they're encountering the case, just create subdirectory.) Then 
we can eliminate the pattern check entirely and throw error on config 
validation phase. WDYT?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query

2019-10-25 Thread GitBox
HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] 
Provide option to clean up completed files in streaming query
URL: https://github.com/apache/spark/pull/22952#discussion_r338960691
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ##
 @@ -330,4 +362,139 @@ object FileStreamSource {
 
 def size: Int = map.size()
   }
+
+  private[sql] class FileStreamSourceCleaner(
+  fileSystem: FileSystem,
+  sourcePath: Path,
+  baseArchiveFileSystem: Option[FileSystem],
+  baseArchivePath: Option[Path]) extends Logging {
+require(baseArchiveFileSystem.isDefined == baseArchivePath.isDefined)
+
+private val sourceGlobFilters: Seq[GlobFilter] = 
buildSourceGlobFilters(sourcePath)
+
+private val sameFsSourceAndArchive: Boolean = {
+  baseArchiveFileSystem.exists { fs =>
+if (fileSystem.getUri != fs.getUri) {
+  logWarning("Base archive path is located to the different filesystem 
with source, " +
+s"which is not supported. source path: ${sourcePath} / base 
archive path: " +
+s"${baseArchivePath.get}")
+  false
+} else {
+  true
+}
+  }
+}
+
+/**
+ * This is a flag to skip matching archived path with source path.
+ *
+ * FileStreamSource reads the files which one of below conditions is met:
+ * 1) file itself is matched with source path
+ * 2) parent directory is matched with source path
+ *
+ * Checking with glob pattern is costly, so this flag leverages above 
information to prune
+ * the cases where the file cannot be matched with source path. For 
example, when file is
+ * moved to archive directory, destination path will retain input file's 
path as suffix,
+ * so destination path can't be matched with source path if archive 
directory's depth is
+ * longer than 2, as neither file nor parent directory of destination path 
can be matched
+ * with source path.
+ */
+private val skipCheckingGlob: Boolean = baseArchivePath.exists(_.depth() > 
2)
+
+def archive(entry: FileEntry): Unit = {
+  require(baseArchivePath.isDefined)
+
+  if (sameFsSourceAndArchive) {
+val curPath = new Path(new URI(entry.path))
+val newPath = new Path(baseArchivePath.get, 
curPath.toUri.getPath.stripPrefix("/"))
+
+if (!skipCheckingGlob && pathMatchesSourcePattern(newPath)) {
 
 Review comment:
   Technically, we just need to check pattern only when the depth of base 
archive path is 2. The depth of base archive path cannot be 1, because then 
archive file will always refer to the same path of original source file. (Yes I 
was aware of this but forgot to apply the fact to prune.)
   
   So `depth > 2` -> no need to check, `depth == 1` -> always matches, `depth 
== 2` -> need to check the pattern. We can even just simplify the case to avoid 
checking per file as checking the earliest dir name of base archive path 
matches the earlier dir name (including glob path) of source path. It won't 
guarantee all the archive paths will match the source path - that's why I did 
full pattern match per file, but if we really don't want to do pattern matching 
per file, yes, we can do it.
   
   What I concern about is rather not a technical one. If we want to just yell 
and fail the query instead of skipping archive file, we must ensure end users 
understand why the query is failed and how to get it fixed. I had to explain 
most of reviewers how pattern check works, even with that I'm feeling reviewers 
don't feel familiar with it - I'm unsure it would work for end users though we 
can add some guidance how it works.
   
   So we will need to simplify the condition as much as possible if we really 
want to just fail the query. The difference of necessary amount of explanation 
for depth > 2 and depth >=2 are significant, and the actual harm of not 
allowing depth == 2 is that they just need to create a subdirectory and use the 
subdirectory, which doesn't seem to be critical.
   
   Does it make sense?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query

2019-10-25 Thread GitBox
HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] 
Provide option to clean up completed files in streaming query
URL: https://github.com/apache/spark/pull/22952#discussion_r339282089
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ##
 @@ -330,4 +362,139 @@ object FileStreamSource {
 
 def size: Int = map.size()
   }
+
+  private[sql] class FileStreamSourceCleaner(
+  fileSystem: FileSystem,
+  sourcePath: Path,
+  baseArchiveFileSystem: Option[FileSystem],
+  baseArchivePath: Option[Path]) extends Logging {
+require(baseArchiveFileSystem.isDefined == baseArchivePath.isDefined)
+
+private val sourceGlobFilters: Seq[GlobFilter] = 
buildSourceGlobFilters(sourcePath)
+
+private val sameFsSourceAndArchive: Boolean = {
+  baseArchiveFileSystem.exists { fs =>
+if (fileSystem.getUri != fs.getUri) {
+  logWarning("Base archive path is located to the different filesystem 
with source, " +
+s"which is not supported. source path: ${sourcePath} / base 
archive path: " +
+s"${baseArchivePath.get}")
+  false
+} else {
+  true
+}
+  }
+}
+
+/**
+ * This is a flag to skip matching archived path with source path.
+ *
+ * FileStreamSource reads the files which one of below conditions is met:
+ * 1) file itself is matched with source path
+ * 2) parent directory is matched with source path
+ *
+ * Checking with glob pattern is costly, so this flag leverages above 
information to prune
+ * the cases where the file cannot be matched with source path. For 
example, when file is
+ * moved to archive directory, destination path will retain input file's 
path as suffix,
+ * so destination path can't be matched with source path if archive 
directory's depth is
+ * longer than 2, as neither file nor parent directory of destination path 
can be matched
+ * with source path.
+ */
+private val skipCheckingGlob: Boolean = baseArchivePath.exists(_.depth() > 
2)
+
+def archive(entry: FileEntry): Unit = {
+  require(baseArchivePath.isDefined)
+
+  if (sameFsSourceAndArchive) {
+val curPath = new Path(new URI(entry.path))
+val newPath = new Path(baseArchivePath.get, 
curPath.toUri.getPath.stripPrefix("/"))
+
+if (!skipCheckingGlob && pathMatchesSourcePattern(newPath)) {
 
 Review comment:
   If I were one of end users and query fails because of glob path, I might be 
puzzling and want to know why - that's only when we allow `depth == 2` and I 
think we've agreed `depth > 2` doesn't need to explain further detail. I'll 
apply it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query

2019-10-29 Thread GitBox
HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] 
Provide option to clean up completed files in streaming query
URL: https://github.com/apache/spark/pull/22952#discussion_r340308673
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ##
 @@ -330,4 +362,77 @@ object FileStreamSource {
 
 def size: Int = map.size()
   }
+
+  private[sql] class FileStreamSourceCleaner(
+  fileSystem: FileSystem,
+  sourcePath: Path,
+  baseArchiveFileSystem: Option[FileSystem],
+  baseArchivePath: Option[Path]) extends Logging {
+assertParameters()
+
+private def assertParameters(): Unit = {
+  require(baseArchiveFileSystem.isDefined == baseArchivePath.isDefined)
+
+  baseArchiveFileSystem.foreach { fs =>
+require(fileSystem.getUri == fs.getUri, "Base archive path is located 
on a different " +
+  s"file system than the source files. source path: $sourcePath" +
+  s" / base archive path: ${baseArchivePath.get}")
+  }
+
+  baseArchivePath.foreach { path =>
+
+/**
+ * FileStreamSource reads the files which one of below conditions is 
met:
+ * 1) file itself is matched with source path
+ * 2) parent directory is matched with source path
+ *
+ * Checking with glob pattern is costly, so set this requirement to 
eliminate the cases
+ * where the archive path can be matched with source path. For 
example, when file is moved
+ * to archive directory, destination path will retain input file's 
path as suffix, so
+ * destination path can't be matched with source path if archive 
directory's depth is longer
+ * than 2, as neither file nor parent directory of destination path 
can be matched with
+ * source path.
+ */
+require(path.depth() > 2, "Base archive path must have a depth of at 
least 2 " +
+  "subdirectories. e.g. '/data/archive'")
+  }
+}
+
+def archive(entry: FileEntry): Unit = {
+  require(baseArchivePath.isDefined)
+
+  val curPath = new Path(new URI(entry.path))
+  val newPath = new Path(baseArchivePath.get, 
curPath.toUri.getPath.stripPrefix("/"))
 
 Review comment:
   I'm revisiting two issues and not sure there's a viable workaround. Looks 
like the issue pointed out was ":" isn't a valid char for HDFS but might be a 
valid char for other filesystems so Path API doesn't restrict it and leads 
problem. Even HDFS-14762 is closed as "Won't fix". 
   
   Would this only occur on `Path(parent, child)` and `Path(pathstr)` is safe? 
Would it work if we manually concat two paths as string and pass to Path's 
constructor?
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query

2019-10-29 Thread GitBox
HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] 
Provide option to clean up completed files in streaming query
URL: https://github.com/apache/spark/pull/22952#discussion_r340311674
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ##
 @@ -330,4 +362,77 @@ object FileStreamSource {
 
 def size: Int = map.size()
   }
+
+  private[sql] class FileStreamSourceCleaner(
+  fileSystem: FileSystem,
+  sourcePath: Path,
+  baseArchiveFileSystem: Option[FileSystem],
+  baseArchivePath: Option[Path]) extends Logging {
+assertParameters()
+
+private def assertParameters(): Unit = {
+  require(baseArchiveFileSystem.isDefined == baseArchivePath.isDefined)
+
+  baseArchiveFileSystem.foreach { fs =>
+require(fileSystem.getUri == fs.getUri, "Base archive path is located 
on a different " +
+  s"file system than the source files. source path: $sourcePath" +
+  s" / base archive path: ${baseArchivePath.get}")
+  }
+
+  baseArchivePath.foreach { path =>
+
+/**
+ * FileStreamSource reads the files which one of below conditions is 
met:
+ * 1) file itself is matched with source path
+ * 2) parent directory is matched with source path
+ *
+ * Checking with glob pattern is costly, so set this requirement to 
eliminate the cases
+ * where the archive path can be matched with source path. For 
example, when file is moved
+ * to archive directory, destination path will retain input file's 
path as suffix, so
+ * destination path can't be matched with source path if archive 
directory's depth is longer
+ * than 2, as neither file nor parent directory of destination path 
can be matched with
+ * source path.
+ */
+require(path.depth() > 2, "Base archive path must have a depth of at 
least 2 " +
+  "subdirectories. e.g. '/data/archive'")
+  }
+}
+
+def archive(entry: FileEntry): Unit = {
+  require(baseArchivePath.isDefined)
+
+  val curPath = new Path(new URI(entry.path))
+  val newPath = new Path(baseArchivePath.get, 
curPath.toUri.getPath.stripPrefix("/"))
 
 Review comment:
   Thanks for the quick feedback! I'll reflect it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query

2019-10-31 Thread GitBox
HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] 
Provide option to clean up completed files in streaming query
URL: https://github.com/apache/spark/pull/22952#discussion_r341395755
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ##
 @@ -330,4 +362,77 @@ object FileStreamSource {
 
 def size: Int = map.size()
   }
+
+  private[sql] class FileStreamSourceCleaner(
+  fileSystem: FileSystem,
+  sourcePath: Path,
+  baseArchiveFileSystem: Option[FileSystem],
+  baseArchivePath: Option[Path]) extends Logging {
+assertParameters()
+
+private def assertParameters(): Unit = {
+  require(baseArchiveFileSystem.isDefined == baseArchivePath.isDefined)
+
+  baseArchiveFileSystem.foreach { fs =>
+require(fileSystem.getUri == fs.getUri, "Base archive path is located 
on a different " +
+  s"file system than the source files. source path: $sourcePath" +
+  s" / base archive path: ${baseArchivePath.get}")
+  }
+
+  baseArchivePath.foreach { path =>
+
+/**
+ * FileStreamSource reads the files which one of below conditions is 
met:
+ * 1) file itself is matched with source path
+ * 2) parent directory is matched with source path
+ *
+ * Checking with glob pattern is costly, so set this requirement to 
eliminate the cases
+ * where the archive path can be matched with source path. For 
example, when file is moved
+ * to archive directory, destination path will retain input file's 
path as suffix, so
+ * destination path can't be matched with source path if archive 
directory's depth is longer
+ * than 2, as neither file nor parent directory of destination path 
can be matched with
+ * source path.
+ */
+require(path.depth() > 2, "Base archive path must have a depth of at 
least 2 " +
 
 Review comment:
   So the explanation says about 2 "subdirectories", not 2 "depth". / denotes 
its own depth. I don't think depth is the term end users are familiar with - 
I'll remove the part "a depth of".


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query

2019-10-31 Thread GitBox
HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] 
Provide option to clean up completed files in streaming query
URL: https://github.com/apache/spark/pull/22952#discussion_r341397990
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ##
 @@ -53,6 +55,18 @@ class FileStreamSource(
 fs.makeQualified(new Path(path))  // can contain glob patterns
   }
 
+  private val sourceCleaner: FileStreamSourceCleaner = {
 
 Review comment:
   Yeah, that's a good suggestion. Will address.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query

2019-11-18 Thread GitBox
HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] 
Provide option to clean up completed files in streaming query
URL: https://github.com/apache/spark/pull/22952#discussion_r347673657
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ##
 @@ -330,4 +341,96 @@ object FileStreamSource {
 
 def size: Int = map.size()
   }
+
+  private[sql] trait FileStreamSourceCleaner {
+def clean(entry: FileEntry): Unit
+  }
+
+  private[sql] object FileStreamSourceCleaner {
+def apply(
+fileSystem: FileSystem,
+sourcePath: Path,
+option: FileStreamOptions,
+hadoopConf: Configuration): Option[FileStreamSourceCleaner] = 
option.cleanSource match {
+  case CleanSourceMode.ARCHIVE =>
+require(option.sourceArchiveDir.isDefined)
+val path = new Path(option.sourceArchiveDir.get)
+val archiveFs = path.getFileSystem(hadoopConf)
+val qualifiedArchivePath = archiveFs.makeQualified(path)
+Some(new SourceFileArchiver(fileSystem, sourcePath, archiveFs, 
qualifiedArchivePath))
+
+  case CleanSourceMode.DELETE =>
+Some(new SourceFileRemover(fileSystem))
+
+  case _ => None
+}
+  }
+
+  private[sql] class SourceFileArchiver(
+  fileSystem: FileSystem,
+  sourcePath: Path,
+  baseArchiveFileSystem: FileSystem,
+  baseArchivePath: Path) extends FileStreamSourceCleaner with Logging {
+assertParameters()
+
+private def assertParameters(): Unit = {
+  require(fileSystem.getUri == baseArchiveFileSystem.getUri, "Base archive 
path is located " +
+s"on a different file system than the source files. source path: 
$sourcePath" +
+s" / base archive path: $baseArchivePath")
+
+  /**
+   * FileStreamSource reads the files which one of below conditions is met:
+   * 1) file itself is matched with source path
+   * 2) parent directory is matched with source path
 
 Review comment:
   @zsxwing 
   Thanks for spending your time to revisit this! The condition is based on the 
test suite in FileStreamSource, but for partitioned paths, yes that's missed. 
Nice finding. I need to update the condition, or just remove the condition 
documented there at all.
   
   For `recursiveFileLookup`, it came later than the patch and I missed it. The 
condition was formed in early this year, and recursiveFileLookup seemed to come 
in mid this year.
   
   Adding two cases, FileStreamSource can read any files under the source path, 
which invalidates the depth check. There're three options to deal with this:
   
   1) No pattern check and just try to rename. Log it if it fails to rename.
   2) Disallow any path to be used as base archive path if the path matches the 
source path (glob). After then we don't need to check the pattern.
   3) Do pattern check before renaming, though it needs checking pattern per 
file. We may optimize this a bit via grouping files per directory and check the 
pattern with directory instead of individual files.
   
   Which one (or couple of) would be the preferred approach?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query

2019-11-18 Thread GitBox
HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] 
Provide option to clean up completed files in streaming query
URL: https://github.com/apache/spark/pull/22952#discussion_r347673657
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ##
 @@ -330,4 +341,96 @@ object FileStreamSource {
 
 def size: Int = map.size()
   }
+
+  private[sql] trait FileStreamSourceCleaner {
+def clean(entry: FileEntry): Unit
+  }
+
+  private[sql] object FileStreamSourceCleaner {
+def apply(
+fileSystem: FileSystem,
+sourcePath: Path,
+option: FileStreamOptions,
+hadoopConf: Configuration): Option[FileStreamSourceCleaner] = 
option.cleanSource match {
+  case CleanSourceMode.ARCHIVE =>
+require(option.sourceArchiveDir.isDefined)
+val path = new Path(option.sourceArchiveDir.get)
+val archiveFs = path.getFileSystem(hadoopConf)
+val qualifiedArchivePath = archiveFs.makeQualified(path)
+Some(new SourceFileArchiver(fileSystem, sourcePath, archiveFs, 
qualifiedArchivePath))
+
+  case CleanSourceMode.DELETE =>
+Some(new SourceFileRemover(fileSystem))
+
+  case _ => None
+}
+  }
+
+  private[sql] class SourceFileArchiver(
+  fileSystem: FileSystem,
+  sourcePath: Path,
+  baseArchiveFileSystem: FileSystem,
+  baseArchivePath: Path) extends FileStreamSourceCleaner with Logging {
+assertParameters()
+
+private def assertParameters(): Unit = {
+  require(fileSystem.getUri == baseArchiveFileSystem.getUri, "Base archive 
path is located " +
+s"on a different file system than the source files. source path: 
$sourcePath" +
+s" / base archive path: $baseArchivePath")
+
+  /**
+   * FileStreamSource reads the files which one of below conditions is met:
+   * 1) file itself is matched with source path
+   * 2) parent directory is matched with source path
 
 Review comment:
   @zsxwing 
   Thanks for spending your time to revisit this! The condition is based on the 
test suite in FileStreamSource, but for partitioned paths, yes that's missed. 
Nice finding. I need to update the condition, or just remove the condition 
documented there at all.
   
   For `recursiveFileLookup`, it came later than the patch and I missed it. The 
condition was formed in early this year, and recursiveFileLookup seemed to come 
in mid this year.
   
   Adding two cases, FileStreamSource can read any files under the source path, 
which invalidates the depth check. There're three options to deal with this:
   
   1) No pattern check and just try to rename. Log it if it fails to rename. 
(Caution! It doesn't prevent archived file to be added to source file again in 
different directory.)
   2) Disallow any path to be used as base archive path if the path matches the 
source path (glob). After then we don't need to check the pattern.
   3) Do pattern check before renaming, though it needs checking pattern per 
file. We may optimize this a bit via grouping files per directory and check the 
pattern with directory instead of individual files.
   
   Which one (or couple of) would be the preferred approach?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query

2019-11-18 Thread GitBox
HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] 
Provide option to clean up completed files in streaming query
URL: https://github.com/apache/spark/pull/22952#discussion_r347673657
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ##
 @@ -330,4 +341,96 @@ object FileStreamSource {
 
 def size: Int = map.size()
   }
+
+  private[sql] trait FileStreamSourceCleaner {
+def clean(entry: FileEntry): Unit
+  }
+
+  private[sql] object FileStreamSourceCleaner {
+def apply(
+fileSystem: FileSystem,
+sourcePath: Path,
+option: FileStreamOptions,
+hadoopConf: Configuration): Option[FileStreamSourceCleaner] = 
option.cleanSource match {
+  case CleanSourceMode.ARCHIVE =>
+require(option.sourceArchiveDir.isDefined)
+val path = new Path(option.sourceArchiveDir.get)
+val archiveFs = path.getFileSystem(hadoopConf)
+val qualifiedArchivePath = archiveFs.makeQualified(path)
+Some(new SourceFileArchiver(fileSystem, sourcePath, archiveFs, 
qualifiedArchivePath))
+
+  case CleanSourceMode.DELETE =>
+Some(new SourceFileRemover(fileSystem))
+
+  case _ => None
+}
+  }
+
+  private[sql] class SourceFileArchiver(
+  fileSystem: FileSystem,
+  sourcePath: Path,
+  baseArchiveFileSystem: FileSystem,
+  baseArchivePath: Path) extends FileStreamSourceCleaner with Logging {
+assertParameters()
+
+private def assertParameters(): Unit = {
+  require(fileSystem.getUri == baseArchiveFileSystem.getUri, "Base archive 
path is located " +
+s"on a different file system than the source files. source path: 
$sourcePath" +
+s" / base archive path: $baseArchivePath")
+
+  /**
+   * FileStreamSource reads the files which one of below conditions is met:
+   * 1) file itself is matched with source path
+   * 2) parent directory is matched with source path
 
 Review comment:
   @zsxwing 
   Thanks for spending your time to revisit this! The condition is based on the 
test suite in FileStreamSource, but for partitioned paths, yes that's missed. 
Nice finding. I need to update the condition, or just remove the condition 
documented there at all.
   
   For `recursiveFileLookup`, it came later than the patch and I missed it. The 
condition was formed in early this year, and recursiveFileLookup seemed to come 
in mid this year.
   
   Adding two cases, FileStreamSource can read any files under the source path, 
which invalidates the depth check. There're three options to deal with this:
   
   1) No pattern check and just try to rename. Log it if it fails to rename. 
(Caution! It doesn't prevent archived file to be added to source file again in 
different directory.)
   2) Disallow any path to be used as base archive path if the path matches the 
source path (glob) - fail the query. After then we don't need to check the 
pattern. If end users provide complicated glob path as source path, they also 
may be puzzled how to not match.
   3) Do pattern check before renaming, though it needs checking pattern per 
file. We may optimize this a bit via grouping files per directory and check the 
pattern with directory instead of individual files. It doesn't fail the query 
so end users need to check whether the files are not cleaned up due to the 
pattern check.
   
   Which one (or couple of) would be the preferred approach?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query

2019-11-18 Thread GitBox
HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] 
Provide option to clean up completed files in streaming query
URL: https://github.com/apache/spark/pull/22952#discussion_r347673657
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ##
 @@ -330,4 +341,96 @@ object FileStreamSource {
 
 def size: Int = map.size()
   }
+
+  private[sql] trait FileStreamSourceCleaner {
+def clean(entry: FileEntry): Unit
+  }
+
+  private[sql] object FileStreamSourceCleaner {
+def apply(
+fileSystem: FileSystem,
+sourcePath: Path,
+option: FileStreamOptions,
+hadoopConf: Configuration): Option[FileStreamSourceCleaner] = 
option.cleanSource match {
+  case CleanSourceMode.ARCHIVE =>
+require(option.sourceArchiveDir.isDefined)
+val path = new Path(option.sourceArchiveDir.get)
+val archiveFs = path.getFileSystem(hadoopConf)
+val qualifiedArchivePath = archiveFs.makeQualified(path)
+Some(new SourceFileArchiver(fileSystem, sourcePath, archiveFs, 
qualifiedArchivePath))
+
+  case CleanSourceMode.DELETE =>
+Some(new SourceFileRemover(fileSystem))
+
+  case _ => None
+}
+  }
+
+  private[sql] class SourceFileArchiver(
+  fileSystem: FileSystem,
+  sourcePath: Path,
+  baseArchiveFileSystem: FileSystem,
+  baseArchivePath: Path) extends FileStreamSourceCleaner with Logging {
+assertParameters()
+
+private def assertParameters(): Unit = {
+  require(fileSystem.getUri == baseArchiveFileSystem.getUri, "Base archive 
path is located " +
+s"on a different file system than the source files. source path: 
$sourcePath" +
+s" / base archive path: $baseArchivePath")
+
+  /**
+   * FileStreamSource reads the files which one of below conditions is met:
+   * 1) file itself is matched with source path
+   * 2) parent directory is matched with source path
 
 Review comment:
   @zsxwing 
   Thanks for spending your time to revisit this! The condition is based on the 
test suite in FileStreamSource, but for partitioned paths, yes that's missed. 
Nice finding. I need to update the condition, or just remove the condition 
documented there at all.
   
   For `recursiveFileLookup`, it came later than the patch and I missed it. The 
condition was formed in early this year, and recursiveFileLookup seemed to come 
in mid this year.
   
   Adding two cases, FileStreamSource can read any files under the source path, 
which invalidates the depth check. There're three options to deal with this:
   
   1) No pattern check and just try to rename. Log it if it fails to rename. 
(Caution! It doesn't prevent archived file to be added to source file again in 
different directory.)
   2) Disallow any path to be used as base archive path if the path matches the 
source path (glob) - here "disallow" means fail the query. After then we don't 
need to check the pattern. If end users provide complicated glob path as source 
path, they also may be puzzled how to not match.
   3) Do pattern check before renaming, though it needs checking pattern per 
file. We may optimize this a bit via grouping files per directory and check the 
pattern with directory instead of individual files. It doesn't fail the query 
so end users need to check whether the files are not cleaned up due to the 
pattern check.
   
   Which one (or couple of) would be the preferred approach?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query

2019-11-18 Thread GitBox
HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] 
Provide option to clean up completed files in streaming query
URL: https://github.com/apache/spark/pull/22952#discussion_r347673657
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ##
 @@ -330,4 +341,96 @@ object FileStreamSource {
 
 def size: Int = map.size()
   }
+
+  private[sql] trait FileStreamSourceCleaner {
+def clean(entry: FileEntry): Unit
+  }
+
+  private[sql] object FileStreamSourceCleaner {
+def apply(
+fileSystem: FileSystem,
+sourcePath: Path,
+option: FileStreamOptions,
+hadoopConf: Configuration): Option[FileStreamSourceCleaner] = 
option.cleanSource match {
+  case CleanSourceMode.ARCHIVE =>
+require(option.sourceArchiveDir.isDefined)
+val path = new Path(option.sourceArchiveDir.get)
+val archiveFs = path.getFileSystem(hadoopConf)
+val qualifiedArchivePath = archiveFs.makeQualified(path)
+Some(new SourceFileArchiver(fileSystem, sourcePath, archiveFs, 
qualifiedArchivePath))
+
+  case CleanSourceMode.DELETE =>
+Some(new SourceFileRemover(fileSystem))
+
+  case _ => None
+}
+  }
+
+  private[sql] class SourceFileArchiver(
+  fileSystem: FileSystem,
+  sourcePath: Path,
+  baseArchiveFileSystem: FileSystem,
+  baseArchivePath: Path) extends FileStreamSourceCleaner with Logging {
+assertParameters()
+
+private def assertParameters(): Unit = {
+  require(fileSystem.getUri == baseArchiveFileSystem.getUri, "Base archive 
path is located " +
+s"on a different file system than the source files. source path: 
$sourcePath" +
+s" / base archive path: $baseArchivePath")
+
+  /**
+   * FileStreamSource reads the files which one of below conditions is met:
+   * 1) file itself is matched with source path
+   * 2) parent directory is matched with source path
 
 Review comment:
   @zsxwing 
   Thanks for spending your time to revisit this! The condition is based on the 
test suite in FileStreamSource, but for partitioned paths, yes that's missed. 
Nice finding. I need to update the condition, or just remove the condition 
documented there at all.
   
   For `recursiveFileLookup`, it came later than the patch and I missed it. The 
condition was formed in early this year, and recursiveFileLookup seemed to come 
in mid this year.
   
   Adding two cases, FileStreamSource can read any files under the source path, 
which invalidates the depth check. There're three options to deal with this:
   
   1) No pattern check and just try to rename. Log it if it fails to rename. 
(Caution! It doesn't prevent archived file to be added to source file again in 
different directory.)
   2) Disallow any path to be used as base archive path if the path matches the 
source path (glob) - here "disallow" means fail the query. After then we don't 
need to check the pattern. If end users provide complicated glob path as source 
path, they also may be puzzled how to not match, but not sure they would really 
want to set the path be complicated in production.
   3) Do pattern check before renaming, though it needs checking pattern per 
file. We may optimize this a bit via grouping files per directory and check the 
pattern with directory instead of individual files. It doesn't fail the query 
so end users need to check whether the files are not cleaned up due to the 
pattern check.
   
   Which one (or couple of) would be the preferred approach?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org