Repository: spark
Updated Branches:
  refs/heads/master 3977223a3 -> 28fafa3ee


[SPARK-17599] Prevent ListingFileCatalog from failing if path doesn't exist

## What changes were proposed in this pull request?

The `ListingFileCatalog` lists files given a set of resolved paths. If a folder 
is deleted at any time between the paths were resolved and the file catalog can 
check for the folder, the Spark job fails. This may abruptly stop long running 
StructuredStreaming jobs for example.

Folders may be deleted by users or automatically by retention policies. These 
cases should not prevent jobs from successfully completing.

## How was this patch tested?

Unit test in `FileCatalogSuite`

Author: Burak Yavuz <brk...@gmail.com>

Closes #15153 from brkyvz/SPARK-17599.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/28fafa3e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/28fafa3e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/28fafa3e

Branch: refs/heads/master
Commit: 28fafa3ee8f3478fa441e7bd6c8fd4ab482ca98e
Parents: 3977223
Author: Burak Yavuz <brk...@gmail.com>
Authored: Wed Sep 21 17:07:16 2016 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Wed Sep 21 17:07:16 2016 +0800

----------------------------------------------------------------------
 .../sql/execution/datasources/ListingFileCatalog.scala  | 12 ++++++++++--
 .../sql/execution/datasources/FileCatalogSuite.scala    | 11 +++++++++++
 2 files changed, 21 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/28fafa3e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
index 60742bd..3253208 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.execution.datasources
 
+import java.io.FileNotFoundException
+
 import scala.collection.mutable
 
 import org.apache.hadoop.fs.{FileStatus, LocatedFileStatus, Path}
@@ -97,8 +99,14 @@ class ListingFileCatalog(
         logTrace(s"Listing $path on driver")
 
         val childStatuses = {
-          val stats = fs.listStatus(path)
-          if (pathFilter != null) stats.filter(f => 
pathFilter.accept(f.getPath)) else stats
+          try {
+            val stats = fs.listStatus(path)
+            if (pathFilter != null) stats.filter(f => 
pathFilter.accept(f.getPath)) else stats
+          } catch {
+            case _: FileNotFoundException =>
+              logWarning(s"The directory $path was not found. Was it deleted 
very recently?")
+              Array.empty[FileStatus]
+          }
         }
 
         childStatuses.map {

http://git-wip-us.apache.org/repos/asf/spark/blob/28fafa3e/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala
index 0d9ea51..5c8d322 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala
@@ -67,4 +67,15 @@ class FileCatalogSuite extends SharedSQLContext {
 
     }
   }
+
+  test("ListingFileCatalog: folders that don't exist don't throw exceptions") {
+    withTempDir { dir =>
+      val deletedFolder = new File(dir, "deleted")
+      assert(!deletedFolder.exists())
+      val catalog1 = new ListingFileCatalog(
+        spark, Seq(new Path(deletedFolder.getCanonicalPath)), Map.empty, None)
+      // doesn't throw an exception
+      assert(catalog1.listLeafFiles(catalog1.paths).isEmpty)
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to