Repository: spark
Updated Branches:
  refs/heads/master 50e463423 -> 27bfa9ab3


[SPARK-10721] Log warning when file deletion fails

Author: tedyu <yuzhih...@gmail.com>

Closes #8843 from tedyu/master.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/27bfa9ab
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/27bfa9ab
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/27bfa9ab

Branch: refs/heads/master
Commit: 27bfa9ab3a610e072c011fd88ee4684cea6ceb76
Parents: 50e4634
Author: tedyu <yuzhih...@gmail.com>
Authored: Wed Sep 23 10:01:28 2015 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Wed Sep 23 10:01:28 2015 +0100

----------------------------------------------------------------------
 .../unsafe/sort/UnsafeSorterSpillReader.java           |  7 ++++++-
 .../scala/org/apache/spark/api/python/PythonRDD.scala  |  7 +++++--
 .../scala/org/apache/spark/deploy/RPackageUtils.scala  | 10 +++++++---
 .../spark/deploy/history/FsHistoryProvider.scala       |  8 ++++++--
 .../deploy/master/FileSystemPersistenceEngine.scala    |  5 ++++-
 .../org/apache/spark/rdd/ReliableCheckpointRDD.scala   |  4 +++-
 .../apache/spark/rdd/ReliableRDDCheckpointData.scala   |  6 ++++--
 .../apache/spark/scheduler/EventLoggingListener.scala  |  8 ++++++--
 .../spark/scheduler/cluster/SimrSchedulerBackend.scala |  4 +++-
 .../spark/shuffle/FileShuffleBlockResolver.scala       |  5 ++++-
 .../spark/shuffle/IndexShuffleBlockResolver.scala      | 13 +++++++++----
 .../scala/org/apache/spark/storage/DiskStore.scala     | 10 ++++++++--
 .../spark/util/collection/ExternalAppendOnlyMap.scala  |  8 ++++++--
 .../apache/spark/util/collection/ExternalSorter.scala  |  4 +++-
 .../network/shuffle/ExternalShuffleBlockResolver.java  |  8 ++++++--
 15 files changed, 80 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/27bfa9ab/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
index 4989b05..501dfe7 100644
--- 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
+++ 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
@@ -24,12 +24,15 @@ import com.google.common.io.ByteStreams;
 import org.apache.spark.storage.BlockId;
 import org.apache.spark.storage.BlockManager;
 import org.apache.spark.unsafe.Platform;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Reads spill files written by {@link UnsafeSorterSpillWriter} (see that 
class for a description
  * of the file format).
  */
 final class UnsafeSorterSpillReader extends UnsafeSorterIterator {
+  private static final Logger logger = 
LoggerFactory.getLogger(UnsafeSorterSpillReader.class);
 
   private final File file;
   private InputStream in;
@@ -73,7 +76,9 @@ final class UnsafeSorterSpillReader extends 
UnsafeSorterIterator {
     numRecordsRemaining--;
     if (numRecordsRemaining == 0) {
       in.close();
-      file.delete();
+      if (!file.delete() && file.exists()) {
+        logger.warn("Unable to delete spill file {}", file.getPath());
+      }
       in = null;
       din = null;
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/27bfa9ab/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 3788d18..19be093 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -871,7 +871,8 @@ private class PythonAccumulatorParam(@transient private val 
serverHost: String,
  * write the data into disk after deserialization, then Python can read it 
from disks.
  */
 // scalastyle:off no.finalize
-private[spark] class PythonBroadcast(@transient var path: String) extends 
Serializable {
+private[spark] class PythonBroadcast(@transient var path: String) extends 
Serializable
+  with Logging {
 
   /**
    * Read data from disks, then copy it to `out`
@@ -907,7 +908,9 @@ private[spark] class PythonBroadcast(@transient var path: 
String) extends Serial
     if (!path.isEmpty) {
       val file = new File(path)
       if (file.exists()) {
-        file.delete()
+        if (!file.delete()) {
+          logWarning(s"Error deleting ${file.getPath}")
+        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/27bfa9ab/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala 
b/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala
index 4b28866..7d160b6 100644
--- a/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala
@@ -175,8 +175,10 @@ private[deploy] object RPackageUtils extends Logging {
               print(s"ERROR: Failed to build R package in $file.", printStream)
               print(RJarDoc, printStream)
             }
-          } finally {
-            rSource.delete() // clean up
+          } finally { // clean up
+            if (!rSource.delete()) {
+              logWarning(s"Error deleting ${rSource.getPath()}")
+            }
           }
         } else {
           if (verbose) {
@@ -211,7 +213,9 @@ private[deploy] object RPackageUtils extends Logging {
     val filesToBundle = listFilesRecursively(dir, Seq(".zip"))
     // create a zip file from scratch, do not append to existing file.
     val zipFile = new File(dir, name)
-    zipFile.delete()
+    if (!zipFile.delete()) {
+      logWarning(s"Error deleting ${zipFile.getPath()}")
+    }
     val zipOutputStream = new ZipOutputStream(new FileOutputStream(zipFile, 
false))
     try {
       filesToBundle.foreach { file =>

http://git-wip-us.apache.org/repos/asf/spark/blob/27bfa9ab/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 8eb2ba1..5eb8adf 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -242,7 +242,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
         logError("Exception encountered when attempting to update last scan 
time", e)
         lastScanTime
     } finally {
-      fs.delete(path)
+      if (!fs.delete(path)) {
+        logWarning(s"Error deleting ${path}")
+      }
     }
   }
 
@@ -405,7 +407,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
         try {
           val path = new Path(logDir, attempt.logPath)
           if (fs.exists(path)) {
-            fs.delete(path, true)
+            if (!fs.delete(path, true)) {
+              logWarning(s"Error deleting ${path}")
+            }
           }
         } catch {
           case e: AccessControlException =>

http://git-wip-us.apache.org/repos/asf/spark/blob/27bfa9ab/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala
 
b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala
index aa379d4..1aa8cd5 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala
@@ -45,7 +45,10 @@ private[master] class FileSystemPersistenceEngine(
   }
 
   override def unpersist(name: String): Unit = {
-    new File(dir + File.separator + name).delete()
+    val f = new File(dir + File.separator + name)
+    if (!f.delete()) {
+      logWarning(s"Error deleting ${f.getPath()}")
+    }
   }
 
   override def read[T: ClassTag](prefix: String): Seq[T] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/27bfa9ab/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
index 1c3b5da..a69be6a 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
@@ -144,7 +144,9 @@ private[spark] object ReliableCheckpointRDD extends Logging 
{
       } else {
         // Some other copy of this task must've finished before us and renamed 
it
         logInfo(s"Final output path $finalOutputPath already exists; not 
overwriting it")
-        fs.delete(tempOutputPath, false)
+        if (!fs.delete(tempOutputPath, false)) {
+          logWarning(s"Error deleting ${tempOutputPath}")
+        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/27bfa9ab/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala 
b/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala
index e9f6060..91cad66 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala
@@ -89,7 +89,7 @@ private[spark] class ReliableRDDCheckpointData[T: 
ClassTag](@transient private v
 
 }
 
-private[spark] object ReliableRDDCheckpointData {
+private[spark] object ReliableRDDCheckpointData extends Logging {
 
   /** Return the path of the directory to which this RDD's checkpoint data is 
written. */
   def checkpointPath(sc: SparkContext, rddId: Int): Option[Path] = {
@@ -101,7 +101,9 @@ private[spark] object ReliableRDDCheckpointData {
     checkpointPath(sc, rddId).foreach { path =>
       val fs = path.getFileSystem(sc.hadoopConfiguration)
       if (fs.exists(path)) {
-        fs.delete(path, true)
+        if (!fs.delete(path, true)) {
+          logWarning(s"Error deleting ${path.toString()}")
+        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/27bfa9ab/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala 
b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index 5a06ef0..000a021 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -109,7 +109,9 @@ private[spark] class EventLoggingListener(
 
     if (shouldOverwrite && fileSystem.exists(path)) {
       logWarning(s"Event log $path already exists. Overwriting...")
-      fileSystem.delete(path, true)
+      if (!fileSystem.delete(path, true)) {
+        logWarning(s"Error deleting $path")
+      }
     }
 
     /* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing 
(HADOOP-7844).
@@ -216,7 +218,9 @@ private[spark] class EventLoggingListener(
     if (fileSystem.exists(target)) {
       if (shouldOverwrite) {
         logWarning(s"Event log $target already exists. Overwriting...")
-        fileSystem.delete(target, true)
+        if (!fileSystem.delete(target, true)) {
+          logWarning(s"Error deleting $target")
+        }
       } else {
         throw new IOException("Target log file already exists 
(%s)".format(logPath))
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/27bfa9ab/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
index 0324c9d..641638a 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
@@ -65,7 +65,9 @@ private[spark] class SimrSchedulerBackend(
   override def stop() {
     val conf = SparkHadoopUtil.get.newConfiguration(sc.conf)
     val fs = FileSystem.get(conf)
-    fs.delete(new Path(driverFilePath), false)
+    if (!fs.delete(new Path(driverFilePath), false)) {
+      logWarning(s"error deleting ${driverFilePath}")
+    }
     super.stop()
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/27bfa9ab/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala 
b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala
index d9902f9..cd253a7 100644
--- 
a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala
+++ 
b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala
@@ -127,7 +127,10 @@ private[spark] class FileShuffleBlockResolver(conf: 
SparkConf)
       case Some(state) =>
         for (mapId <- state.completedMapTasks.asScala; reduceId <- 0 until 
state.numReducers) {
           val blockId = new ShuffleBlockId(shuffleId, mapId, reduceId)
-          blockManager.diskBlockManager.getFile(blockId).delete()
+          val file = blockManager.diskBlockManager.getFile(blockId)
+          if (!file.delete()) {
+            logWarning(s"Error deleting ${file.getPath()}")
+          }
         }
         logInfo("Deleted all files for shuffle " + shuffleId)
         true

http://git-wip-us.apache.org/repos/asf/spark/blob/27bfa9ab/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala 
b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
index d0163d3..65887d1 100644
--- 
a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
+++ 
b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
@@ -21,7 +21,7 @@ import java.io._
 
 import com.google.common.io.ByteStreams
 
-import org.apache.spark.{SparkConf, SparkEnv}
+import org.apache.spark.{SparkConf, SparkEnv, Logging}
 import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, 
ManagedBuffer}
 import org.apache.spark.network.netty.SparkTransportConf
 import org.apache.spark.storage._
@@ -40,7 +40,8 @@ import IndexShuffleBlockResolver.NOOP_REDUCE_ID
  */
 // Note: Changes to the format in this file should be kept in sync with
 // 
org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getSortBasedShuffleBlockData().
-private[spark] class IndexShuffleBlockResolver(conf: SparkConf) extends 
ShuffleBlockResolver {
+private[spark] class IndexShuffleBlockResolver(conf: SparkConf) extends 
ShuffleBlockResolver
+  with Logging {
 
   private lazy val blockManager = SparkEnv.get.blockManager
 
@@ -60,12 +61,16 @@ private[spark] class IndexShuffleBlockResolver(conf: 
SparkConf) extends ShuffleB
   def removeDataByMap(shuffleId: Int, mapId: Int): Unit = {
     var file = getDataFile(shuffleId, mapId)
     if (file.exists()) {
-      file.delete()
+      if (!file.delete()) {
+        logWarning(s"Error deleting data ${file.getPath()}")
+      }
     }
 
     file = getIndexFile(shuffleId, mapId)
     if (file.exists()) {
-      file.delete()
+      if (!file.delete()) {
+        logWarning(s"Error deleting index ${file.getPath()}")
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/27bfa9ab/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala 
b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
index feb9533..c008b9d 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
@@ -86,7 +86,9 @@ private[spark] class DiskStore(blockManager: BlockManager, 
diskManager: DiskBloc
     } catch {
       case e: Throwable =>
         if (file.exists()) {
-          file.delete()
+          if (!file.delete()) {
+            logWarning(s"Error deleting ${file}")
+          }
         }
         throw e
     }
@@ -155,7 +157,11 @@ private[spark] class DiskStore(blockManager: BlockManager, 
diskManager: DiskBloc
   override def remove(blockId: BlockId): Boolean = {
     val file = diskManager.getFile(blockId.name)
     if (file.exists()) {
-      file.delete()
+      val ret = file.delete()
+      if (!ret) {
+        logWarning(s"Error deleting ${file.getPath()}")
+      }
+      ret
     } else {
       false
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/27bfa9ab/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 
b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index f929b12..29c5732 100644
--- 
a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ 
b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -208,7 +208,9 @@ class ExternalAppendOnlyMap[K, V, C](
           writer.revertPartialWritesAndClose()
         }
         if (file.exists()) {
-          file.delete()
+          if (!file.delete()) {
+            logWarning(s"Error deleting ${file}")
+          }
         }
       }
     }
@@ -489,7 +491,9 @@ class ExternalAppendOnlyMap[K, V, C](
         fileStream = null
       }
       if (file.exists()) {
-        file.delete()
+        if (!file.delete()) {
+          logWarning(s"Error deleting ${file}")
+        }
       }
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/27bfa9ab/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala 
b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
index 2a30f75..749be34 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
@@ -318,7 +318,9 @@ private[spark] class ExternalSorter[K, V, C](
           writer.revertPartialWritesAndClose()
         }
         if (file.exists()) {
-          file.delete()
+          if (!file.delete()) {
+            logWarning(s"Error deleting ${file}")
+          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/27bfa9ab/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
----------------------------------------------------------------------
diff --git 
a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
 
b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
index c5f93bb..0d4dd6a 100644
--- 
a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
+++ 
b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
@@ -114,10 +114,14 @@ public class ExternalShuffleBlockResolver {
             "recover state for existing applications", registeredExecutorFile, 
e);
           if (registeredExecutorFile.isDirectory()) {
             for (File f : registeredExecutorFile.listFiles()) {
-              f.delete();
+              if (!f.delete()) {
+                logger.warn("error deleting {}", f.getPath());
+              }
             }
           }
-          registeredExecutorFile.delete();
+          if (!registeredExecutorFile.delete()) {
+            logger.warn("error deleting {}", registeredExecutorFile.getPath());
+          }
           options.createIfMissing(true);
           try {
             tmpDb = JniDBFactory.factory.open(registeredExecutorFile, options);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to