Repository: spark Updated Branches: refs/heads/branch-2.1 bf2f233e4 -> 4f3ce062c
[SPARK-19082][SQL] Make ignoreCorruptFiles work for Parquet ## What changes were proposed in this pull request? We have a config `spark.sql.files.ignoreCorruptFiles` which can be used to ignore corrupt files when reading files in SQL. Currently the `ignoreCorruptFiles` config has two issues and can't work for Parquet: 1. We only ignore corrupt files in `FileScanRDD` . Actually, we begin to read those files as early as inferring data schema from the files. For corrupt files, we can't read the schema and fail the program. A related issue reported at http://apache-spark-developers-list.1001551.n3.nabble.com/Skip-Corrupted-Parquet-blocks-footer-tc20418.html 2. In `FileScanRDD`, we assume that we only begin to read the files when starting to consume the iterator. However, it is possibly the files are read before that. In this case, `ignoreCorruptFiles` config doesn't work too. This patch targets Parquet datasource. If this direction is ok, we can address the same issue for other datasources like Orc. Two main changes in this patch: 1. Replace `ParquetFileReader.readAllFootersInParallel` by implementing the logic to read footers in multi-threaded manner We can't ignore corrupt files if we use `ParquetFileReader.readAllFootersInParallel`. So this patch implements the logic to do the similar thing in `readParquetFootersInParallel`. 2. In `FileScanRDD`, we need to ignore corrupt file too when we call `readFunction` to return iterator. One thing to notice is: We read schema from Parquet file's footer. The method to read footer `ParquetFileReader.readFooter` throws `RuntimeException`, instead of `IOException`, if it can't successfully read the footer. Please check out https://github.com/apache/parquet-mr/blob/df9d8e415436292ae33e1ca0b8da256640de9710/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java#L470. So this patch catches `RuntimeException`. One concern is that it might also shadow other runtime exceptions other than reading corrupt files. ## How was this patch tested? Jenkins tests. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Liang-Chi Hsieh <vii...@gmail.com> Closes #16474 from viirya/fix-ignorecorrupted-parquet-files. (cherry picked from commit 61e48f52d1d8c7431707bd3511b6fe9f0ae996c0) Signed-off-by: Wenchen Fan <wenc...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4f3ce062 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4f3ce062 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4f3ce062 Branch: refs/heads/branch-2.1 Commit: 4f3ce062ce2e9b403f9d38a44eb7fc76a800ed67 Parents: bf2f233 Author: Liang-Chi Hsieh <vii...@gmail.com> Authored: Mon Jan 16 15:26:41 2017 +0800 Committer: Wenchen Fan <wenc...@databricks.com> Committed: Mon Jan 16 15:27:01 2017 +0800 ---------------------------------------------------------------------- .../scala/org/apache/spark/rdd/UnionRDD.scala | 2 +- .../sql/execution/datasources/FileScanRDD.scala | 12 +++- .../datasources/parquet/ParquetFileFormat.scala | 45 +++++++++++++-- .../parquet/ParquetFileFormatSuite.scala | 59 ++++++++++++++++++++ .../datasources/parquet/ParquetQuerySuite.scala | 30 ++++++++++ 5 files changed, 140 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/4f3ce062/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala index ad1fddb..60e383a 100644 --- a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala @@ -20,7 +20,7 @@ package org.apache.spark.rdd import java.io.{IOException, ObjectOutputStream} import scala.collection.mutable.ArrayBuffer -import scala.collection.parallel.{ForkJoinTaskSupport, ThreadPoolTaskSupport} +import scala.collection.parallel.ForkJoinTaskSupport import scala.concurrent.forkjoin.ForkJoinPool import scala.reflect.ClassTag http://git-wip-us.apache.org/repos/asf/spark/blob/4f3ce062/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala index b926b92..8434592 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala @@ -134,7 +134,17 @@ class FileScanRDD( try { if (ignoreCorruptFiles) { currentIterator = new NextIterator[Object] { - private val internalIter = readFunction(currentFile) + private val internalIter = { + try { + // The readFunction may read files before consuming the iterator. + // E.g., vectorized Parquet reader. + readFunction(currentFile) + } catch { + case e @(_: RuntimeException | _: IOException) => + logWarning(s"Skipped the rest content in the corrupted file: $currentFile", e) + Iterator.empty + } + } override def getNext(): AnyRef = { try { http://git-wip-us.apache.org/repos/asf/spark/blob/4f3ce062/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 0965ffe..0e1fc7a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -17,10 +17,13 @@ package org.apache.spark.sql.execution.datasources.parquet +import java.io.IOException import java.net.URI import scala.collection.JavaConverters._ import scala.collection.mutable +import scala.collection.parallel.ForkJoinTaskSupport +import scala.concurrent.forkjoin.ForkJoinPool import scala.util.{Failure, Try} import org.apache.hadoop.conf.Configuration @@ -30,6 +33,7 @@ import org.apache.hadoop.mapreduce.lib.input.FileSplit import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl import org.apache.parquet.filter2.compat.FilterCompat import org.apache.parquet.filter2.predicate.FilterApi +import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS import org.apache.parquet.hadoop._ import org.apache.parquet.hadoop.codec.CodecConfig import org.apache.parquet.hadoop.util.ContextUtil @@ -151,7 +155,7 @@ class ParquetFileFormat } } - def inferSchema( + override def inferSchema( sparkSession: SparkSession, parameters: Map[String, String], files: Seq[FileStatus]): Option[StructType] = { @@ -548,6 +552,36 @@ object ParquetFileFormat extends Logging { } /** + * Reads Parquet footers in multi-threaded manner. + * If the config "spark.sql.files.ignoreCorruptFiles" is set to true, we will ignore the corrupted + * files when reading footers. + */ + private[parquet] def readParquetFootersInParallel( + conf: Configuration, + partFiles: Seq[FileStatus], + ignoreCorruptFiles: Boolean): Seq[Footer] = { + val parFiles = partFiles.par + parFiles.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(8)) + parFiles.flatMap { currentFile => + try { + // Skips row group information since we only need the schema. + // ParquetFileReader.readFooter throws RuntimeException, instead of IOException, + // when it can't read the footer. + Some(new Footer(currentFile.getPath(), + ParquetFileReader.readFooter( + conf, currentFile, SKIP_ROW_GROUPS))) + } catch { case e: RuntimeException => + if (ignoreCorruptFiles) { + logWarning(s"Skipped the footer in the corrupted file: $currentFile", e) + None + } else { + throw new IOException(s"Could not read footer for file: $currentFile", e) + } + } + }.seq + } + + /** * Figures out a merged Parquet schema with a distributed Spark job. * * Note that locality is not taken into consideration here because: @@ -587,6 +621,8 @@ object ParquetFileFormat extends Logging { val numParallelism = Math.min(Math.max(partialFileStatusInfo.size, 1), sparkSession.sparkContext.defaultParallelism) + val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles + // Issues a Spark job to read Parquet schema in parallel. val partiallyMergedSchemas = sparkSession @@ -598,13 +634,10 @@ object ParquetFileFormat extends Logging { new FileStatus(length, false, 0, 0, 0, 0, null, null, null, new Path(path)) }.toSeq - // Skips row group information since we only need the schema - val skipRowGroups = true - // Reads footers in multi-threaded manner within each task val footers = - ParquetFileReader.readAllFootersInParallel( - serializedConf.value, fakeFileStatuses.asJava, skipRowGroups).asScala + ParquetFileFormat.readParquetFootersInParallel( + serializedConf.value, fakeFileStatuses, ignoreCorruptFiles) // Converter used to convert Parquet `MessageType` to Spark SQL `StructType` val converter = http://git-wip-us.apache.org/repos/asf/spark/blob/4f3ce062/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormatSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormatSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormatSuite.scala new file mode 100644 index 0000000..ccb3435 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormatSuite.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.hadoop.fs.{FileSystem, Path} + +import org.apache.spark.SparkException +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSQLContext + +class ParquetFileFormatSuite extends QueryTest with ParquetTest with SharedSQLContext { + + test("read parquet footers in parallel") { + def testReadFooters(ignoreCorruptFiles: Boolean): Unit = { + withTempDir { dir => + val fs = FileSystem.get(sparkContext.hadoopConfiguration) + val basePath = dir.getCanonicalPath + + val path1 = new Path(basePath, "first") + val path2 = new Path(basePath, "second") + val path3 = new Path(basePath, "third") + + spark.range(1).toDF("a").coalesce(1).write.parquet(path1.toString) + spark.range(1, 2).toDF("a").coalesce(1).write.parquet(path2.toString) + spark.range(2, 3).toDF("a").coalesce(1).write.json(path3.toString) + + val fileStatuses = + Seq(fs.listStatus(path1), fs.listStatus(path2), fs.listStatus(path3)).flatten + + val footers = ParquetFileFormat.readParquetFootersInParallel( + sparkContext.hadoopConfiguration, fileStatuses, ignoreCorruptFiles) + + assert(footers.size == 2) + } + } + + testReadFooters(true) + val exception = intercept[java.io.IOException] { + testReadFooters(false) + } + assert(exception.getMessage().contains("Could not read footer for file")) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/4f3ce062/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index 4c4a7d8..6132376 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -22,6 +22,7 @@ import java.io.File import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.parquet.hadoop.ParquetOutputFormat +import org.apache.spark.SparkException import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow @@ -212,6 +213,35 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext } } + test("Enabling/disabling ignoreCorruptFiles") { + def testIgnoreCorruptFiles(): Unit = { + withTempDir { dir => + val basePath = dir.getCanonicalPath + spark.range(1).toDF("a").write.parquet(new Path(basePath, "first").toString) + spark.range(1, 2).toDF("a").write.parquet(new Path(basePath, "second").toString) + spark.range(2, 3).toDF("a").write.json(new Path(basePath, "third").toString) + val df = spark.read.parquet( + new Path(basePath, "first").toString, + new Path(basePath, "second").toString, + new Path(basePath, "third").toString) + checkAnswer( + df, + Seq(Row(0), Row(1))) + } + } + + withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") { + testIgnoreCorruptFiles() + } + + withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") { + val exception = intercept[SparkException] { + testIgnoreCorruptFiles() + } + assert(exception.getMessage().contains("is not a Parquet file")) + } + } + test("SPARK-8990 DataFrameReader.parquet() should respect user specified options") { withTempPath { dir => val basePath = dir.getCanonicalPath --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org