Repository: spark Updated Branches: refs/heads/master 50e463423 -> 27bfa9ab3
[SPARK-10721] Log warning when file deletion fails Author: tedyu <yuzhih...@gmail.com> Closes #8843 from tedyu/master. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/27bfa9ab Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/27bfa9ab Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/27bfa9ab Branch: refs/heads/master Commit: 27bfa9ab3a610e072c011fd88ee4684cea6ceb76 Parents: 50e4634 Author: tedyu <yuzhih...@gmail.com> Authored: Wed Sep 23 10:01:28 2015 +0100 Committer: Sean Owen <so...@cloudera.com> Committed: Wed Sep 23 10:01:28 2015 +0100 ---------------------------------------------------------------------- .../unsafe/sort/UnsafeSorterSpillReader.java | 7 ++++++- .../scala/org/apache/spark/api/python/PythonRDD.scala | 7 +++++-- .../scala/org/apache/spark/deploy/RPackageUtils.scala | 10 +++++++--- .../spark/deploy/history/FsHistoryProvider.scala | 8 ++++++-- .../deploy/master/FileSystemPersistenceEngine.scala | 5 ++++- .../org/apache/spark/rdd/ReliableCheckpointRDD.scala | 4 +++- .../apache/spark/rdd/ReliableRDDCheckpointData.scala | 6 ++++-- .../apache/spark/scheduler/EventLoggingListener.scala | 8 ++++++-- .../spark/scheduler/cluster/SimrSchedulerBackend.scala | 4 +++- .../spark/shuffle/FileShuffleBlockResolver.scala | 5 ++++- .../spark/shuffle/IndexShuffleBlockResolver.scala | 13 +++++++++---- .../scala/org/apache/spark/storage/DiskStore.scala | 10 ++++++++-- .../spark/util/collection/ExternalAppendOnlyMap.scala | 8 ++++++-- .../apache/spark/util/collection/ExternalSorter.scala | 4 +++- .../network/shuffle/ExternalShuffleBlockResolver.java | 8 ++++++-- 15 files changed, 80 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/27bfa9ab/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java index 4989b05..501dfe7 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java @@ -24,12 +24,15 @@ import com.google.common.io.ByteStreams; import org.apache.spark.storage.BlockId; import org.apache.spark.storage.BlockManager; import org.apache.spark.unsafe.Platform; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Reads spill files written by {@link UnsafeSorterSpillWriter} (see that class for a description * of the file format). */ final class UnsafeSorterSpillReader extends UnsafeSorterIterator { + private static final Logger logger = LoggerFactory.getLogger(UnsafeSorterSpillReader.class); private final File file; private InputStream in; @@ -73,7 +76,9 @@ final class UnsafeSorterSpillReader extends UnsafeSorterIterator { numRecordsRemaining--; if (numRecordsRemaining == 0) { in.close(); - file.delete(); + if (!file.delete() && file.exists()) { + logger.warn("Unable to delete spill file {}", file.getPath()); + } in = null; din = null; } http://git-wip-us.apache.org/repos/asf/spark/blob/27bfa9ab/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 3788d18..19be093 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -871,7 +871,8 @@ private class PythonAccumulatorParam(@transient private val serverHost: String, * write the data into disk after deserialization, then Python can read it from disks. */ // scalastyle:off no.finalize -private[spark] class PythonBroadcast(@transient var path: String) extends Serializable { +private[spark] class PythonBroadcast(@transient var path: String) extends Serializable + with Logging { /** * Read data from disks, then copy it to `out` @@ -907,7 +908,9 @@ private[spark] class PythonBroadcast(@transient var path: String) extends Serial if (!path.isEmpty) { val file = new File(path) if (file.exists()) { - file.delete() + if (!file.delete()) { + logWarning(s"Error deleting ${file.getPath}") + } } } } http://git-wip-us.apache.org/repos/asf/spark/blob/27bfa9ab/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala b/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala index 4b28866..7d160b6 100644 --- a/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala +++ b/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala @@ -175,8 +175,10 @@ private[deploy] object RPackageUtils extends Logging { print(s"ERROR: Failed to build R package in $file.", printStream) print(RJarDoc, printStream) } - } finally { - rSource.delete() // clean up + } finally { // clean up + if (!rSource.delete()) { + logWarning(s"Error deleting ${rSource.getPath()}") + } } } else { if (verbose) { @@ -211,7 +213,9 @@ private[deploy] object RPackageUtils extends Logging { val filesToBundle = listFilesRecursively(dir, Seq(".zip")) // create a zip file from scratch, do not append to existing file. val zipFile = new File(dir, name) - zipFile.delete() + if (!zipFile.delete()) { + logWarning(s"Error deleting ${zipFile.getPath()}") + } val zipOutputStream = new ZipOutputStream(new FileOutputStream(zipFile, false)) try { filesToBundle.foreach { file => http://git-wip-us.apache.org/repos/asf/spark/blob/27bfa9ab/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 8eb2ba1..5eb8adf 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -242,7 +242,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) logError("Exception encountered when attempting to update last scan time", e) lastScanTime } finally { - fs.delete(path) + if (!fs.delete(path)) { + logWarning(s"Error deleting ${path}") + } } } @@ -405,7 +407,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) try { val path = new Path(logDir, attempt.logPath) if (fs.exists(path)) { - fs.delete(path, true) + if (!fs.delete(path, true)) { + logWarning(s"Error deleting ${path}") + } } } catch { case e: AccessControlException => http://git-wip-us.apache.org/repos/asf/spark/blob/27bfa9ab/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala index aa379d4..1aa8cd5 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala @@ -45,7 +45,10 @@ private[master] class FileSystemPersistenceEngine( } override def unpersist(name: String): Unit = { - new File(dir + File.separator + name).delete() + val f = new File(dir + File.separator + name) + if (!f.delete()) { + logWarning(s"Error deleting ${f.getPath()}") + } } override def read[T: ClassTag](prefix: String): Seq[T] = { http://git-wip-us.apache.org/repos/asf/spark/blob/27bfa9ab/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala index 1c3b5da..a69be6a 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala @@ -144,7 +144,9 @@ private[spark] object ReliableCheckpointRDD extends Logging { } else { // Some other copy of this task must've finished before us and renamed it logInfo(s"Final output path $finalOutputPath already exists; not overwriting it") - fs.delete(tempOutputPath, false) + if (!fs.delete(tempOutputPath, false)) { + logWarning(s"Error deleting ${tempOutputPath}") + } } } } http://git-wip-us.apache.org/repos/asf/spark/blob/27bfa9ab/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala index e9f6060..91cad66 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala @@ -89,7 +89,7 @@ private[spark] class ReliableRDDCheckpointData[T: ClassTag](@transient private v } -private[spark] object ReliableRDDCheckpointData { +private[spark] object ReliableRDDCheckpointData extends Logging { /** Return the path of the directory to which this RDD's checkpoint data is written. */ def checkpointPath(sc: SparkContext, rddId: Int): Option[Path] = { @@ -101,7 +101,9 @@ private[spark] object ReliableRDDCheckpointData { checkpointPath(sc, rddId).foreach { path => val fs = path.getFileSystem(sc.hadoopConfiguration) if (fs.exists(path)) { - fs.delete(path, true) + if (!fs.delete(path, true)) { + logWarning(s"Error deleting ${path.toString()}") + } } } } http://git-wip-us.apache.org/repos/asf/spark/blob/27bfa9ab/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index 5a06ef0..000a021 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -109,7 +109,9 @@ private[spark] class EventLoggingListener( if (shouldOverwrite && fileSystem.exists(path)) { logWarning(s"Event log $path already exists. Overwriting...") - fileSystem.delete(path, true) + if (!fileSystem.delete(path, true)) { + logWarning(s"Error deleting $path") + } } /* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing (HADOOP-7844). @@ -216,7 +218,9 @@ private[spark] class EventLoggingListener( if (fileSystem.exists(target)) { if (shouldOverwrite) { logWarning(s"Event log $target already exists. Overwriting...") - fileSystem.delete(target, true) + if (!fileSystem.delete(target, true)) { + logWarning(s"Error deleting $target") + } } else { throw new IOException("Target log file already exists (%s)".format(logPath)) } http://git-wip-us.apache.org/repos/asf/spark/blob/27bfa9ab/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala index 0324c9d..641638a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala @@ -65,7 +65,9 @@ private[spark] class SimrSchedulerBackend( override def stop() { val conf = SparkHadoopUtil.get.newConfiguration(sc.conf) val fs = FileSystem.get(conf) - fs.delete(new Path(driverFilePath), false) + if (!fs.delete(new Path(driverFilePath), false)) { + logWarning(s"error deleting ${driverFilePath}") + } super.stop() } http://git-wip-us.apache.org/repos/asf/spark/blob/27bfa9ab/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala index d9902f9..cd253a7 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala @@ -127,7 +127,10 @@ private[spark] class FileShuffleBlockResolver(conf: SparkConf) case Some(state) => for (mapId <- state.completedMapTasks.asScala; reduceId <- 0 until state.numReducers) { val blockId = new ShuffleBlockId(shuffleId, mapId, reduceId) - blockManager.diskBlockManager.getFile(blockId).delete() + val file = blockManager.diskBlockManager.getFile(blockId) + if (!file.delete()) { + logWarning(s"Error deleting ${file.getPath()}") + } } logInfo("Deleted all files for shuffle " + shuffleId) true http://git-wip-us.apache.org/repos/asf/spark/blob/27bfa9ab/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala index d0163d3..65887d1 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala @@ -21,7 +21,7 @@ import java.io._ import com.google.common.io.ByteStreams -import org.apache.spark.{SparkConf, SparkEnv} +import org.apache.spark.{SparkConf, SparkEnv, Logging} import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer} import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.storage._ @@ -40,7 +40,8 @@ import IndexShuffleBlockResolver.NOOP_REDUCE_ID */ // Note: Changes to the format in this file should be kept in sync with // org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getSortBasedShuffleBlockData(). -private[spark] class IndexShuffleBlockResolver(conf: SparkConf) extends ShuffleBlockResolver { +private[spark] class IndexShuffleBlockResolver(conf: SparkConf) extends ShuffleBlockResolver + with Logging { private lazy val blockManager = SparkEnv.get.blockManager @@ -60,12 +61,16 @@ private[spark] class IndexShuffleBlockResolver(conf: SparkConf) extends ShuffleB def removeDataByMap(shuffleId: Int, mapId: Int): Unit = { var file = getDataFile(shuffleId, mapId) if (file.exists()) { - file.delete() + if (!file.delete()) { + logWarning(s"Error deleting data ${file.getPath()}") + } } file = getIndexFile(shuffleId, mapId) if (file.exists()) { - file.delete() + if (!file.delete()) { + logWarning(s"Error deleting index ${file.getPath()}") + } } } http://git-wip-us.apache.org/repos/asf/spark/blob/27bfa9ab/core/src/main/scala/org/apache/spark/storage/DiskStore.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index feb9533..c008b9d 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -86,7 +86,9 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc } catch { case e: Throwable => if (file.exists()) { - file.delete() + if (!file.delete()) { + logWarning(s"Error deleting ${file}") + } } throw e } @@ -155,7 +157,11 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc override def remove(blockId: BlockId): Boolean = { val file = diskManager.getFile(blockId.name) if (file.exists()) { - file.delete() + val ret = file.delete() + if (!ret) { + logWarning(s"Error deleting ${file.getPath()}") + } + ret } else { false } http://git-wip-us.apache.org/repos/asf/spark/blob/27bfa9ab/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index f929b12..29c5732 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -208,7 +208,9 @@ class ExternalAppendOnlyMap[K, V, C]( writer.revertPartialWritesAndClose() } if (file.exists()) { - file.delete() + if (!file.delete()) { + logWarning(s"Error deleting ${file}") + } } } } @@ -489,7 +491,9 @@ class ExternalAppendOnlyMap[K, V, C]( fileStream = null } if (file.exists()) { - file.delete() + if (!file.delete()) { + logWarning(s"Error deleting ${file}") + } } } http://git-wip-us.apache.org/repos/asf/spark/blob/27bfa9ab/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index 2a30f75..749be34 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -318,7 +318,9 @@ private[spark] class ExternalSorter[K, V, C]( writer.revertPartialWritesAndClose() } if (file.exists()) { - file.delete() + if (!file.delete()) { + logWarning(s"Error deleting ${file}") + } } } } http://git-wip-us.apache.org/repos/asf/spark/blob/27bfa9ab/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java ---------------------------------------------------------------------- diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java index c5f93bb..0d4dd6a 100644 --- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java +++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java @@ -114,10 +114,14 @@ public class ExternalShuffleBlockResolver { "recover state for existing applications", registeredExecutorFile, e); if (registeredExecutorFile.isDirectory()) { for (File f : registeredExecutorFile.listFiles()) { - f.delete(); + if (!f.delete()) { + logger.warn("error deleting {}", f.getPath()); + } } } - registeredExecutorFile.delete(); + if (!registeredExecutorFile.delete()) { + logger.warn("error deleting {}", registeredExecutorFile.getPath()); + } options.createIfMissing(true); try { tmpDb = JniDBFactory.factory.open(registeredExecutorFile, options); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org