Repository: spark Updated Branches: refs/heads/master 5415963d2 -> 8e9863531
[SPARK-22366] Support ignoring missing files ## What changes were proposed in this pull request? Add a flag "spark.sql.files.ignoreMissingFiles" to parallel the existing flag "spark.sql.files.ignoreCorruptFiles". ## How was this patch tested? new unit test Author: Jose Torres <j...@databricks.com> Closes #19581 from joseph-torres/SPARK-22366. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8e986353 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8e986353 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8e986353 Branch: refs/heads/master Commit: 8e9863531bebbd4d83eafcbc2b359b8bd0ac5734 Parents: 5415963 Author: Jose Torres <j...@databricks.com> Authored: Thu Oct 26 16:55:30 2017 -0700 Committer: Shixiong Zhu <zsxw...@gmail.com> Committed: Thu Oct 26 16:55:30 2017 -0700 ---------------------------------------------------------------------- .../org/apache/spark/sql/internal/SQLConf.scala | 8 +++++ .../sql/execution/datasources/FileScanRDD.scala | 13 +++++--- .../datasources/parquet/ParquetQuerySuite.scala | 33 ++++++++++++++++++++ 3 files changed, 50 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/8e986353/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 4cfe53b..21e4685 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -614,6 +614,12 @@ object SQLConf { .booleanConf .createWithDefault(false) + val IGNORE_MISSING_FILES = buildConf("spark.sql.files.ignoreMissingFiles") + .doc("Whether to ignore missing files. If true, the Spark jobs will continue to run when " + + "encountering missing files and the contents that have been read will still be returned.") + .booleanConf + .createWithDefault(false) + val MAX_RECORDS_PER_FILE = buildConf("spark.sql.files.maxRecordsPerFile") .doc("Maximum number of records to write out to a single file. " + "If this value is zero or negative, there is no limit.") @@ -1014,6 +1020,8 @@ class SQLConf extends Serializable with Logging { def ignoreCorruptFiles: Boolean = getConf(IGNORE_CORRUPT_FILES) + def ignoreMissingFiles: Boolean = getConf(IGNORE_MISSING_FILES) + def maxRecordsPerFile: Long = getConf(MAX_RECORDS_PER_FILE) def useCompression: Boolean = getConf(COMPRESS_CACHED) http://git-wip-us.apache.org/repos/asf/spark/blob/8e986353/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala index 9df2073..8731ee8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala @@ -66,6 +66,7 @@ class FileScanRDD( extends RDD[InternalRow](sparkSession.sparkContext, Nil) { private val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles + private val ignoreMissingFiles = sparkSession.sessionState.conf.ignoreMissingFiles override def compute(split: RDDPartition, context: TaskContext): Iterator[InternalRow] = { val iterator = new Iterator[Object] with AutoCloseable { @@ -142,7 +143,7 @@ class FileScanRDD( // Sets InputFileBlockHolder for the file block's information InputFileBlockHolder.set(currentFile.filePath, currentFile.start, currentFile.length) - if (ignoreCorruptFiles) { + if (ignoreMissingFiles || ignoreCorruptFiles) { currentIterator = new NextIterator[Object] { // The readFunction may read some bytes before consuming the iterator, e.g., // vectorized Parquet reader. Here we use lazy val to delay the creation of @@ -158,9 +159,13 @@ class FileScanRDD( null } } catch { - // Throw FileNotFoundException even `ignoreCorruptFiles` is true - case e: FileNotFoundException => throw e - case e @ (_: RuntimeException | _: IOException) => + case e: FileNotFoundException if ignoreMissingFiles => + logWarning(s"Skipped missing file: $currentFile", e) + finished = true + null + // Throw FileNotFoundException even if `ignoreCorruptFiles` is true + case e: FileNotFoundException if !ignoreMissingFiles => throw e + case e @ (_: RuntimeException | _: IOException) if ignoreCorruptFiles => logWarning( s"Skipped the rest of the content in the corrupted file: $currentFile", e) finished = true http://git-wip-us.apache.org/repos/asf/spark/blob/8e986353/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index 2efff3f..e822e40 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -316,6 +316,39 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext } } + testQuietly("Enabling/disabling ignoreMissingFiles") { + def testIgnoreMissingFiles(): Unit = { + withTempDir { dir => + val basePath = dir.getCanonicalPath + spark.range(1).toDF("a").write.parquet(new Path(basePath, "first").toString) + spark.range(1, 2).toDF("a").write.parquet(new Path(basePath, "second").toString) + val thirdPath = new Path(basePath, "third") + spark.range(2, 3).toDF("a").write.parquet(thirdPath.toString) + val df = spark.read.parquet( + new Path(basePath, "first").toString, + new Path(basePath, "second").toString, + new Path(basePath, "third").toString) + + val fs = thirdPath.getFileSystem(spark.sparkContext.hadoopConfiguration) + fs.delete(thirdPath, true) + checkAnswer( + df, + Seq(Row(0), Row(1))) + } + } + + withSQLConf(SQLConf.IGNORE_MISSING_FILES.key -> "true") { + testIgnoreMissingFiles() + } + + withSQLConf(SQLConf.IGNORE_MISSING_FILES.key -> "false") { + val exception = intercept[SparkException] { + testIgnoreMissingFiles() + } + assert(exception.getMessage().contains("does not exist")) + } + } + /** * this is part of test 'Enabling/disabling ignoreCorruptFiles' but run in a loop * to increase the chance of failure --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org