Repository: spark
Updated Branches:
  refs/heads/branch-2.1 bf2f233e4 -> 4f3ce062c


[SPARK-19082][SQL] Make ignoreCorruptFiles work for Parquet

## What changes were proposed in this pull request?

We have a config `spark.sql.files.ignoreCorruptFiles` which can be used to 
ignore corrupt files when reading files in SQL. Currently the 
`ignoreCorruptFiles` config has two issues and can't work for Parquet:

1. We only ignore corrupt files in `FileScanRDD` . Actually, we begin to read 
those files as early as inferring data schema from the files. For corrupt 
files, we can't read the schema and fail the program. A related issue reported 
at 
http://apache-spark-developers-list.1001551.n3.nabble.com/Skip-Corrupted-Parquet-blocks-footer-tc20418.html
2. In `FileScanRDD`, we assume that we only begin to read the files when 
starting to consume the iterator. However, it is possibly the files are read 
before that. In this case, `ignoreCorruptFiles` config doesn't work too.

This patch targets Parquet datasource. If this direction is ok, we can address 
the same issue for other datasources like Orc.

Two main changes in this patch:

1. Replace `ParquetFileReader.readAllFootersInParallel` by implementing the 
logic to read footers in multi-threaded manner

    We can't ignore corrupt files if we use 
`ParquetFileReader.readAllFootersInParallel`. So this patch implements the 
logic to do the similar thing in `readParquetFootersInParallel`.

2. In `FileScanRDD`, we need to ignore corrupt file too when we call 
`readFunction` to return iterator.

One thing to notice is:

We read schema from Parquet file's footer. The method to read footer 
`ParquetFileReader.readFooter` throws `RuntimeException`, instead of 
`IOException`, if it can't successfully read the footer. Please check out 
https://github.com/apache/parquet-mr/blob/df9d8e415436292ae33e1ca0b8da256640de9710/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java#L470.
 So this patch catches `RuntimeException`.  One concern is that it might also 
shadow other runtime exceptions other than reading corrupt files.

## How was this patch tested?

Jenkins tests.

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Author: Liang-Chi Hsieh <vii...@gmail.com>

Closes #16474 from viirya/fix-ignorecorrupted-parquet-files.

(cherry picked from commit 61e48f52d1d8c7431707bd3511b6fe9f0ae996c0)
Signed-off-by: Wenchen Fan <wenc...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4f3ce062
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4f3ce062
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4f3ce062

Branch: refs/heads/branch-2.1
Commit: 4f3ce062ce2e9b403f9d38a44eb7fc76a800ed67
Parents: bf2f233
Author: Liang-Chi Hsieh <vii...@gmail.com>
Authored: Mon Jan 16 15:26:41 2017 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Mon Jan 16 15:27:01 2017 +0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/rdd/UnionRDD.scala   |  2 +-
 .../sql/execution/datasources/FileScanRDD.scala | 12 +++-
 .../datasources/parquet/ParquetFileFormat.scala | 45 +++++++++++++--
 .../parquet/ParquetFileFormatSuite.scala        | 59 ++++++++++++++++++++
 .../datasources/parquet/ParquetQuerySuite.scala | 30 ++++++++++
 5 files changed, 140 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4f3ce062/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
index ad1fddb..60e383a 100644
--- a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
@@ -20,7 +20,7 @@ package org.apache.spark.rdd
 import java.io.{IOException, ObjectOutputStream}
 
 import scala.collection.mutable.ArrayBuffer
-import scala.collection.parallel.{ForkJoinTaskSupport, ThreadPoolTaskSupport}
+import scala.collection.parallel.ForkJoinTaskSupport
 import scala.concurrent.forkjoin.ForkJoinPool
 import scala.reflect.ClassTag
 

http://git-wip-us.apache.org/repos/asf/spark/blob/4f3ce062/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
index b926b92..8434592 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
@@ -134,7 +134,17 @@ class FileScanRDD(
           try {
             if (ignoreCorruptFiles) {
               currentIterator = new NextIterator[Object] {
-                private val internalIter = readFunction(currentFile)
+                private val internalIter = {
+                  try {
+                    // The readFunction may read files before consuming the 
iterator.
+                    // E.g., vectorized Parquet reader.
+                    readFunction(currentFile)
+                  } catch {
+                    case e @(_: RuntimeException | _: IOException) =>
+                      logWarning(s"Skipped the rest content in the corrupted 
file: $currentFile", e)
+                      Iterator.empty
+                  }
+                }
 
                 override def getNext(): AnyRef = {
                   try {

http://git-wip-us.apache.org/repos/asf/spark/blob/4f3ce062/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 0965ffe..0e1fc7a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -17,10 +17,13 @@
 
 package org.apache.spark.sql.execution.datasources.parquet
 
+import java.io.IOException
 import java.net.URI
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
+import scala.collection.parallel.ForkJoinTaskSupport
+import scala.concurrent.forkjoin.ForkJoinPool
 import scala.util.{Failure, Try}
 
 import org.apache.hadoop.conf.Configuration
@@ -30,6 +33,7 @@ import org.apache.hadoop.mapreduce.lib.input.FileSplit
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
 import org.apache.parquet.filter2.compat.FilterCompat
 import org.apache.parquet.filter2.predicate.FilterApi
+import 
org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
 import org.apache.parquet.hadoop._
 import org.apache.parquet.hadoop.codec.CodecConfig
 import org.apache.parquet.hadoop.util.ContextUtil
@@ -151,7 +155,7 @@ class ParquetFileFormat
     }
   }
 
-  def inferSchema(
+  override def inferSchema(
       sparkSession: SparkSession,
       parameters: Map[String, String],
       files: Seq[FileStatus]): Option[StructType] = {
@@ -548,6 +552,36 @@ object ParquetFileFormat extends Logging {
   }
 
   /**
+   * Reads Parquet footers in multi-threaded manner.
+   * If the config "spark.sql.files.ignoreCorruptFiles" is set to true, we 
will ignore the corrupted
+   * files when reading footers.
+   */
+  private[parquet] def readParquetFootersInParallel(
+      conf: Configuration,
+      partFiles: Seq[FileStatus],
+      ignoreCorruptFiles: Boolean): Seq[Footer] = {
+    val parFiles = partFiles.par
+    parFiles.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(8))
+    parFiles.flatMap { currentFile =>
+      try {
+        // Skips row group information since we only need the schema.
+        // ParquetFileReader.readFooter throws RuntimeException, instead of 
IOException,
+        // when it can't read the footer.
+        Some(new Footer(currentFile.getPath(),
+          ParquetFileReader.readFooter(
+            conf, currentFile, SKIP_ROW_GROUPS)))
+      } catch { case e: RuntimeException =>
+        if (ignoreCorruptFiles) {
+          logWarning(s"Skipped the footer in the corrupted file: 
$currentFile", e)
+          None
+        } else {
+          throw new IOException(s"Could not read footer for file: 
$currentFile", e)
+        }
+      }
+    }.seq
+  }
+
+  /**
    * Figures out a merged Parquet schema with a distributed Spark job.
    *
    * Note that locality is not taken into consideration here because:
@@ -587,6 +621,8 @@ object ParquetFileFormat extends Logging {
     val numParallelism = Math.min(Math.max(partialFileStatusInfo.size, 1),
       sparkSession.sparkContext.defaultParallelism)
 
+    val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles
+
     // Issues a Spark job to read Parquet schema in parallel.
     val partiallyMergedSchemas =
       sparkSession
@@ -598,13 +634,10 @@ object ParquetFileFormat extends Logging {
             new FileStatus(length, false, 0, 0, 0, 0, null, null, null, new 
Path(path))
           }.toSeq
 
-          // Skips row group information since we only need the schema
-          val skipRowGroups = true
-
           // Reads footers in multi-threaded manner within each task
           val footers =
-            ParquetFileReader.readAllFootersInParallel(
-              serializedConf.value, fakeFileStatuses.asJava, 
skipRowGroups).asScala
+            ParquetFileFormat.readParquetFootersInParallel(
+              serializedConf.value, fakeFileStatuses, ignoreCorruptFiles)
 
           // Converter used to convert Parquet `MessageType` to Spark SQL 
`StructType`
           val converter =

http://git-wip-us.apache.org/repos/asf/spark/blob/4f3ce062/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormatSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormatSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormatSuite.scala
new file mode 100644
index 0000000..ccb3435
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormatSuite.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.hadoop.fs.{FileSystem, Path}
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSQLContext
+
+class ParquetFileFormatSuite extends QueryTest with ParquetTest with 
SharedSQLContext {
+
+  test("read parquet footers in parallel") {
+    def testReadFooters(ignoreCorruptFiles: Boolean): Unit = {
+      withTempDir { dir =>
+        val fs = FileSystem.get(sparkContext.hadoopConfiguration)
+        val basePath = dir.getCanonicalPath
+
+        val path1 = new Path(basePath, "first")
+        val path2 = new Path(basePath, "second")
+        val path3 = new Path(basePath, "third")
+
+        spark.range(1).toDF("a").coalesce(1).write.parquet(path1.toString)
+        spark.range(1, 2).toDF("a").coalesce(1).write.parquet(path2.toString)
+        spark.range(2, 3).toDF("a").coalesce(1).write.json(path3.toString)
+
+        val fileStatuses =
+          Seq(fs.listStatus(path1), fs.listStatus(path2), 
fs.listStatus(path3)).flatten
+
+        val footers = ParquetFileFormat.readParquetFootersInParallel(
+          sparkContext.hadoopConfiguration, fileStatuses, ignoreCorruptFiles)
+
+        assert(footers.size == 2)
+      }
+    }
+
+    testReadFooters(true)
+    val exception = intercept[java.io.IOException] {
+      testReadFooters(false)
+    }
+    assert(exception.getMessage().contains("Could not read footer for file"))
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/4f3ce062/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index 4c4a7d8..6132376 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -22,6 +22,7 @@ import java.io.File
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.parquet.hadoop.ParquetOutputFormat
 
+import org.apache.spark.SparkException
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
@@ -212,6 +213,35 @@ class ParquetQuerySuite extends QueryTest with ParquetTest 
with SharedSQLContext
     }
   }
 
+  test("Enabling/disabling ignoreCorruptFiles") {
+    def testIgnoreCorruptFiles(): Unit = {
+      withTempDir { dir =>
+        val basePath = dir.getCanonicalPath
+        spark.range(1).toDF("a").write.parquet(new Path(basePath, 
"first").toString)
+        spark.range(1, 2).toDF("a").write.parquet(new Path(basePath, 
"second").toString)
+        spark.range(2, 3).toDF("a").write.json(new Path(basePath, 
"third").toString)
+        val df = spark.read.parquet(
+          new Path(basePath, "first").toString,
+          new Path(basePath, "second").toString,
+          new Path(basePath, "third").toString)
+        checkAnswer(
+          df,
+          Seq(Row(0), Row(1)))
+      }
+    }
+
+    withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") {
+      testIgnoreCorruptFiles()
+    }
+
+    withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") {
+      val exception = intercept[SparkException] {
+        testIgnoreCorruptFiles()
+      }
+      assert(exception.getMessage().contains("is not a Parquet file"))
+    }
+  }
+
   test("SPARK-8990 DataFrameReader.parquet() should respect user specified 
options") {
     withTempPath { dir =>
       val basePath = dir.getCanonicalPath


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to