Repository: spark Updated Branches: refs/heads/master 3977223a3 -> 28fafa3ee
[SPARK-17599] Prevent ListingFileCatalog from failing if path doesn't exist ## What changes were proposed in this pull request? The `ListingFileCatalog` lists files given a set of resolved paths. If a folder is deleted at any time between the paths were resolved and the file catalog can check for the folder, the Spark job fails. This may abruptly stop long running StructuredStreaming jobs for example. Folders may be deleted by users or automatically by retention policies. These cases should not prevent jobs from successfully completing. ## How was this patch tested? Unit test in `FileCatalogSuite` Author: Burak Yavuz <brk...@gmail.com> Closes #15153 from brkyvz/SPARK-17599. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/28fafa3e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/28fafa3e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/28fafa3e Branch: refs/heads/master Commit: 28fafa3ee8f3478fa441e7bd6c8fd4ab482ca98e Parents: 3977223 Author: Burak Yavuz <brk...@gmail.com> Authored: Wed Sep 21 17:07:16 2016 +0800 Committer: Wenchen Fan <wenc...@databricks.com> Committed: Wed Sep 21 17:07:16 2016 +0800 ---------------------------------------------------------------------- .../sql/execution/datasources/ListingFileCatalog.scala | 12 ++++++++++-- .../sql/execution/datasources/FileCatalogSuite.scala | 11 +++++++++++ 2 files changed, 21 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/28fafa3e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala index 60742bd..3253208 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.datasources +import java.io.FileNotFoundException + import scala.collection.mutable import org.apache.hadoop.fs.{FileStatus, LocatedFileStatus, Path} @@ -97,8 +99,14 @@ class ListingFileCatalog( logTrace(s"Listing $path on driver") val childStatuses = { - val stats = fs.listStatus(path) - if (pathFilter != null) stats.filter(f => pathFilter.accept(f.getPath)) else stats + try { + val stats = fs.listStatus(path) + if (pathFilter != null) stats.filter(f => pathFilter.accept(f.getPath)) else stats + } catch { + case _: FileNotFoundException => + logWarning(s"The directory $path was not found. Was it deleted very recently?") + Array.empty[FileStatus] + } } childStatuses.map { http://git-wip-us.apache.org/repos/asf/spark/blob/28fafa3e/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala index 0d9ea51..5c8d322 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala @@ -67,4 +67,15 @@ class FileCatalogSuite extends SharedSQLContext { } } + + test("ListingFileCatalog: folders that don't exist don't throw exceptions") { + withTempDir { dir => + val deletedFolder = new File(dir, "deleted") + assert(!deletedFolder.exists()) + val catalog1 = new ListingFileCatalog( + spark, Seq(new Path(deletedFolder.getCanonicalPath)), Map.empty, None) + // doesn't throw an exception + assert(catalog1.listLeafFiles(catalog1.paths).isEmpty) + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org