Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/21257#discussion_r197173180 --- Diff: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala --- @@ -235,4 +244,41 @@ class HadoopMapReduceCommitProtocol( tmp.getFileSystem(taskContext.getConfiguration).delete(tmp, false) } } + + /** + * now just record the file to be delete + */ + override def deleteWithJob(fs: FileSystem, path: Path, + canDeleteNow: Boolean = true): Boolean = { + if (canDeleteNow) { + super.deleteWithJob(fs, path) + } else { + val set = if (pathsToDelete.contains(fs)) { + pathsToDelete(fs) + } else { + new mutable.HashSet[Path]() + } + + set.add(path) + pathsToDelete.put(fs, set) + true + } + } + + private def cleanPathToDelete(): Unit = { + // first delete the should delete special file + for (fs <- pathsToDelete.keys) { + for (path <- pathsToDelete(fs)) { + try { + if (!fs.delete(path, true)) { + logWarning(s"Delete path ${path} fail at job commit time") + } + } catch { + case ex: IOException => + throw new IOException(s"Unable to clear output " + + s"file ${path} at job commit time", ex) --- End diff -- recommend including ex.toString() in the new exception raised, as child exception text can often get lost
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org