Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22952#discussion_r237341425
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ---
    @@ -257,16 +289,65 @@ class FileStreamSource(
        * equal to `end` and will only request offsets greater than `end` in 
the future.
        */
       override def commit(end: Offset): Unit = {
    -    // No-op for now; FileStreamSource currently garbage-collects files 
based on timestamp
    -    // and the value of the maxFileAge parameter.
    +    def move(entry: FileEntry, baseArchiveDirPath: String): Unit = {
    +      val curPath = new Path(entry.path)
    +      val curPathUri = curPath.toUri
    +
    +      val newPath = new Path(baseArchiveDirPath + curPathUri.getPath)
    +      try {
    +        logDebug(s"Creating directory if it doesn't exist 
${newPath.getParent}")
    +        if (!fs.exists(newPath.getParent)) {
    +          fs.mkdirs(newPath.getParent)
    +        }
    +
    +        logDebug(s"Archiving completed file $curPath to $newPath")
    +        fs.rename(curPath, newPath)
    --- End diff --
    
    Yeah, I guess the patch prevents the case if it works like my expectation, 
but I'm also in favor of defensive programming and logging would be better for 
end users. Will address.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to