Repository: spark
Updated Branches:
  refs/heads/master 5415963d2 -> 8e9863531


[SPARK-22366] Support ignoring missing files

## What changes were proposed in this pull request?

Add a flag "spark.sql.files.ignoreMissingFiles" to parallel the existing flag 
"spark.sql.files.ignoreCorruptFiles".

## How was this patch tested?

new unit test

Author: Jose Torres <j...@databricks.com>

Closes #19581 from joseph-torres/SPARK-22366.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8e986353
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8e986353
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8e986353

Branch: refs/heads/master
Commit: 8e9863531bebbd4d83eafcbc2b359b8bd0ac5734
Parents: 5415963
Author: Jose Torres <j...@databricks.com>
Authored: Thu Oct 26 16:55:30 2017 -0700
Committer: Shixiong Zhu <zsxw...@gmail.com>
Committed: Thu Oct 26 16:55:30 2017 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/internal/SQLConf.scala |  8 +++++
 .../sql/execution/datasources/FileScanRDD.scala | 13 +++++---
 .../datasources/parquet/ParquetQuerySuite.scala | 33 ++++++++++++++++++++
 3 files changed, 50 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8e986353/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 4cfe53b..21e4685 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -614,6 +614,12 @@ object SQLConf {
     .booleanConf
     .createWithDefault(false)
 
+  val IGNORE_MISSING_FILES = buildConf("spark.sql.files.ignoreMissingFiles")
+    .doc("Whether to ignore missing files. If true, the Spark jobs will 
continue to run when " +
+      "encountering missing files and the contents that have been read will 
still be returned.")
+    .booleanConf
+    .createWithDefault(false)
+
   val MAX_RECORDS_PER_FILE = buildConf("spark.sql.files.maxRecordsPerFile")
     .doc("Maximum number of records to write out to a single file. " +
       "If this value is zero or negative, there is no limit.")
@@ -1014,6 +1020,8 @@ class SQLConf extends Serializable with Logging {
 
   def ignoreCorruptFiles: Boolean = getConf(IGNORE_CORRUPT_FILES)
 
+  def ignoreMissingFiles: Boolean = getConf(IGNORE_MISSING_FILES)
+
   def maxRecordsPerFile: Long = getConf(MAX_RECORDS_PER_FILE)
 
   def useCompression: Boolean = getConf(COMPRESS_CACHED)

http://git-wip-us.apache.org/repos/asf/spark/blob/8e986353/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
index 9df2073..8731ee8 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
@@ -66,6 +66,7 @@ class FileScanRDD(
   extends RDD[InternalRow](sparkSession.sparkContext, Nil) {
 
   private val ignoreCorruptFiles = 
sparkSession.sessionState.conf.ignoreCorruptFiles
+  private val ignoreMissingFiles = 
sparkSession.sessionState.conf.ignoreMissingFiles
 
   override def compute(split: RDDPartition, context: TaskContext): 
Iterator[InternalRow] = {
     val iterator = new Iterator[Object] with AutoCloseable {
@@ -142,7 +143,7 @@ class FileScanRDD(
           // Sets InputFileBlockHolder for the file block's information
           InputFileBlockHolder.set(currentFile.filePath, currentFile.start, 
currentFile.length)
 
-          if (ignoreCorruptFiles) {
+          if (ignoreMissingFiles || ignoreCorruptFiles) {
             currentIterator = new NextIterator[Object] {
               // The readFunction may read some bytes before consuming the 
iterator, e.g.,
               // vectorized Parquet reader. Here we use lazy val to delay the 
creation of
@@ -158,9 +159,13 @@ class FileScanRDD(
                     null
                   }
                 } catch {
-                  // Throw FileNotFoundException even `ignoreCorruptFiles` is 
true
-                  case e: FileNotFoundException => throw e
-                  case e @ (_: RuntimeException | _: IOException) =>
+                  case e: FileNotFoundException if ignoreMissingFiles =>
+                    logWarning(s"Skipped missing file: $currentFile", e)
+                    finished = true
+                    null
+                  // Throw FileNotFoundException even if `ignoreCorruptFiles` 
is true
+                  case e: FileNotFoundException if !ignoreMissingFiles => 
throw e
+                  case e @ (_: RuntimeException | _: IOException) if 
ignoreCorruptFiles =>
                     logWarning(
                       s"Skipped the rest of the content in the corrupted file: 
$currentFile", e)
                     finished = true

http://git-wip-us.apache.org/repos/asf/spark/blob/8e986353/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index 2efff3f..e822e40 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -316,6 +316,39 @@ class ParquetQuerySuite extends QueryTest with ParquetTest 
with SharedSQLContext
     }
   }
 
+  testQuietly("Enabling/disabling ignoreMissingFiles") {
+    def testIgnoreMissingFiles(): Unit = {
+      withTempDir { dir =>
+        val basePath = dir.getCanonicalPath
+        spark.range(1).toDF("a").write.parquet(new Path(basePath, 
"first").toString)
+        spark.range(1, 2).toDF("a").write.parquet(new Path(basePath, 
"second").toString)
+        val thirdPath = new Path(basePath, "third")
+        spark.range(2, 3).toDF("a").write.parquet(thirdPath.toString)
+        val df = spark.read.parquet(
+          new Path(basePath, "first").toString,
+          new Path(basePath, "second").toString,
+          new Path(basePath, "third").toString)
+
+        val fs = 
thirdPath.getFileSystem(spark.sparkContext.hadoopConfiguration)
+        fs.delete(thirdPath, true)
+        checkAnswer(
+          df,
+          Seq(Row(0), Row(1)))
+      }
+    }
+
+    withSQLConf(SQLConf.IGNORE_MISSING_FILES.key -> "true") {
+      testIgnoreMissingFiles()
+    }
+
+    withSQLConf(SQLConf.IGNORE_MISSING_FILES.key -> "false") {
+      val exception = intercept[SparkException] {
+        testIgnoreMissingFiles()
+      }
+      assert(exception.getMessage().contains("does not exist"))
+    }
+  }
+
   /**
    * this is part of test 'Enabling/disabling ignoreCorruptFiles' but run in a 
loop
    * to increase the chance of failure


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to