Github user steveloughran commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21257#discussion_r197173180
  
    --- Diff: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 ---
    @@ -235,4 +244,41 @@ class HadoopMapReduceCommitProtocol(
           tmp.getFileSystem(taskContext.getConfiguration).delete(tmp, false)
         }
       }
    +
    +  /**
    +   * now just record the file to be delete
    +   */
    +  override def deleteWithJob(fs: FileSystem, path: Path,
    +      canDeleteNow: Boolean = true): Boolean = {
    +    if (canDeleteNow) {
    +      super.deleteWithJob(fs, path)
    +    } else {
    +      val set = if (pathsToDelete.contains(fs)) {
    +        pathsToDelete(fs)
    +      } else {
    +        new mutable.HashSet[Path]()
    +      }
    +
    +      set.add(path)
    +      pathsToDelete.put(fs, set)
    +      true
    +    }
    +  }
    +
    +  private def cleanPathToDelete(): Unit = {
    +    // first delete the should delete special file
    +    for (fs <- pathsToDelete.keys) {
    +      for (path <- pathsToDelete(fs)) {
    +        try {
    +          if (!fs.delete(path, true)) {
    +            logWarning(s"Delete path ${path} fail at job commit time")
    +          }
    +        } catch {
    +          case ex: IOException =>
    +            throw new IOException(s"Unable to clear output " +
    +                s"file ${path} at job commit time", ex)
    --- End diff --
    
    recommend including ex.toString() in the new exception raised, as child 
exception text can often get lost


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to