Repository: spark Updated Branches: refs/heads/master 330fda8aa -> b47b892e4
[SPARK-18774][CORE][SQL] Ignore non-existing files when ignoreCorruptFiles is enabled ## What changes were proposed in this pull request? When `ignoreCorruptFiles` is enabled, it's better to also ignore non-existing files. ## How was this patch tested? Jenkins Author: Shixiong Zhu <shixi...@databricks.com> Closes #16203 from zsxwing/ignore-file-not-found. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b47b892e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b47b892e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b47b892e Branch: refs/heads/master Commit: b47b892e4579b7b06b4b2837ee4b614e517789f9 Parents: 330fda8 Author: Shixiong Zhu <shixi...@databricks.com> Authored: Wed Dec 7 22:37:04 2016 -0800 Committer: Reynold Xin <r...@databricks.com> Committed: Wed Dec 7 22:37:04 2016 -0800 ---------------------------------------------------------------------- .../apache/spark/internal/config/package.scala | 3 ++- .../scala/org/apache/spark/rdd/HadoopRDD.scala | 14 +++++++++++--- .../org/apache/spark/rdd/NewHadoopRDD.scala | 19 +++++++++++++++---- .../sql/execution/datasources/FileScanRDD.scala | 3 +++ .../org/apache/spark/sql/internal/SQLConf.scala | 3 ++- 5 files changed, 33 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/b47b892e/core/src/main/scala/org/apache/spark/internal/config/package.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index a69a2b5..78aed4f 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -203,7 +203,8 @@ package object config { private[spark] val IGNORE_CORRUPT_FILES = ConfigBuilder("spark.files.ignoreCorruptFiles") .doc("Whether to ignore corrupt files. If true, the Spark jobs will continue to run when " + - "encountering corrupt files and contents that have been read will still be returned.") + "encountering corrupted or non-existing files and contents that have been read will still " + + "be returned.") .booleanConf .createWithDefault(false) http://git-wip-us.apache.org/repos/asf/spark/blob/b47b892e/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 6e87233..a83e139 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -248,12 +248,20 @@ class HadoopRDD[K, V]( HadoopRDD.addLocalConfiguration( new SimpleDateFormat("yyyyMMddHHmmss", Locale.US).format(createTime), context.stageId, theSplit.index, context.attemptNumber, jobConf) - reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL) + reader = + try { + inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL) + } catch { + case e: IOException if ignoreCorruptFiles => + logWarning(s"Skipped the rest content in the corrupted file: ${split.inputSplit}", e) + finished = true + null + } // Register an on-task-completion callback to close the input stream. context.addTaskCompletionListener{ context => closeIfNeeded() } - private val key: K = reader.createKey() - private val value: V = reader.createValue() + private val key: K = if (reader == null) null.asInstanceOf[K] else reader.createKey() + private val value: V = if (reader == null) null.asInstanceOf[V] else reader.createValue() override def getNext(): (K, V) = { try { http://git-wip-us.apache.org/repos/asf/spark/blob/b47b892e/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index e805192..733e85f 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -174,14 +174,25 @@ class NewHadoopRDD[K, V]( } private val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0) private val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId) - private var reader = format.createRecordReader( - split.serializableHadoopSplit.value, hadoopAttemptContext) - reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext) + private var finished = false + private var reader = + try { + val _reader = format.createRecordReader( + split.serializableHadoopSplit.value, hadoopAttemptContext) + _reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext) + _reader + } catch { + case e: IOException if ignoreCorruptFiles => + logWarning( + s"Skipped the rest content in the corrupted file: ${split.serializableHadoopSplit}", + e) + finished = true + null + } // Register an on-task-completion callback to close the input stream. context.addTaskCompletionListener(context => close()) private var havePair = false - private var finished = false private var recordsSinceMetricsUpdate = 0 override def hasNext: Boolean = { http://git-wip-us.apache.org/repos/asf/spark/blob/b47b892e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala index 6d8cd81..e753cd9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala @@ -151,6 +151,9 @@ class FileScanRDD( currentIterator = readFunction(currentFile) } } catch { + case e: IOException if ignoreCorruptFiles => + logWarning(s"Skipped the rest content in the corrupted file: $currentFile", e) + currentIterator = Iterator.empty case e: java.io.FileNotFoundException => throw new java.io.FileNotFoundException( e.getMessage + "\n" + http://git-wip-us.apache.org/repos/asf/spark/blob/b47b892e/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 91f3fe0..c03e88b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -632,7 +632,8 @@ object SQLConf { val IGNORE_CORRUPT_FILES = SQLConfigBuilder("spark.sql.files.ignoreCorruptFiles") .doc("Whether to ignore corrupt files. If true, the Spark jobs will continue to run when " + - "encountering corrupt files and contents that have been read will still be returned.") + "encountering corrupted or non-existing and contents that have been read will still be " + + "returned.") .booleanConf .createWithDefault(false) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org