Repository: spark
Updated Branches:
  refs/heads/branch-2.0 515937046 -> e03c25193


[SPARK-15895][SQL] Filters out metadata files while doing partition discovery

## What changes were proposed in this pull request?

Take the following directory layout as an example:

```
dir/
+- p0=0/
   |-_metadata
   +- p1=0/
      |-part-00001.parquet
      |-part-00002.parquet
      |-...
```

The `_metadata` file under `p0=0` shouldn't fail partition discovery.

This PR filters output all metadata files whose names start with `_` while 
doing partition discovery.

## How was this patch tested?

New unit test added in `ParquetPartitionDiscoverySuite`.

Author: Cheng Lian <l...@databricks.com>

Closes #13623 from liancheng/spark-15895-partition-disco-no-metafiles.

(cherry picked from commit bd39ffe35c6f939debe5d3c5eb4970b4e62507b0)
Signed-off-by: Yin Huai <yh...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e03c2519
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e03c2519
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e03c2519

Branch: refs/heads/branch-2.0
Commit: e03c25193b4a939edbd6e9efc4f46272defd7f0e
Parents: 5159370
Author: Cheng Lian <l...@databricks.com>
Authored: Tue Jun 14 12:13:12 2016 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Tue Jun 14 12:13:35 2016 -0700

----------------------------------------------------------------------
 .../datasources/ListingFileCatalog.scala        |  7 ++--
 .../PartitioningAwareFileCatalog.scala          | 15 +++++--
 .../ParquetPartitionDiscoverySuite.scala        | 44 ++++++++++++++++++++
 3 files changed, 60 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e03c2519/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
index 7d2854a..d96cf1b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources
 import scala.collection.mutable
 import scala.util.Try
 
-import org.apache.hadoop.fs.{FileStatus, LocatedFileStatus, Path}
+import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
 
 import org.apache.spark.sql.SparkSession
@@ -83,8 +83,9 @@ class ListingFileCatalog(
       val statuses: Seq[FileStatus] = paths.flatMap { path =>
         val fs = path.getFileSystem(hadoopConf)
         logInfo(s"Listing $path on driver")
-        Try(HadoopFsRelation.listLeafFiles(fs, fs.getFileStatus(path), 
pathFilter)).
-          getOrElse(Array.empty)
+        Try {
+          HadoopFsRelation.listLeafFiles(fs, fs.getFileStatus(path), 
pathFilter)
+        }.getOrElse(Array.empty[FileStatus])
       }
       mutable.LinkedHashSet(statuses: _*)
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/e03c2519/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
index 406d2e8..811e96c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
@@ -50,14 +50,14 @@ abstract class PartitioningAwareFileCatalog(
 
   override def listFiles(filters: Seq[Expression]): Seq[Partition] = {
     val selectedPartitions = if (partitionSpec().partitionColumns.isEmpty) {
-      Partition(InternalRow.empty, allFiles().filterNot(_.getPath.getName 
startsWith "_")) :: Nil
+      Partition(InternalRow.empty, allFiles().filter(f => 
isDataPath(f.getPath))) :: Nil
     } else {
       prunePartitions(filters, partitionSpec()).map {
         case PartitionDirectory(values, path) =>
           val files: Seq[FileStatus] = leafDirToChildrenFiles.get(path) match {
             case Some(existingDir) =>
               // Directory has children files in it, return them
-              existingDir.filterNot(_.getPath.getName.startsWith("_"))
+              existingDir.filter(f => isDataPath(f.getPath))
 
             case None =>
               // Directory does not exist, or has no children files
@@ -96,7 +96,11 @@ abstract class PartitioningAwareFileCatalog(
 
   protected def inferPartitioning(): PartitionSpec = {
     // We use leaf dirs containing data files to discover the schema.
-    val leafDirs = leafDirToChildrenFiles.keys.toSeq
+    val leafDirs = leafDirToChildrenFiles.filter { case (_, files) =>
+      // SPARK-15895: Metadata files (e.g. Parquet summary files) and 
temporary files should not be
+      // counted as data files, so that they shouldn't participate partition 
discovery.
+      files.exists(f => isDataPath(f.getPath))
+    }.keys.toSeq
     partitionSchema match {
       case Some(userProvidedSchema) if userProvidedSchema.nonEmpty =>
         val spec = PartitioningUtils.parsePartitions(
@@ -197,4 +201,9 @@ abstract class PartitioningAwareFileCatalog(
           if (leafFiles.contains(qualifiedPath)) qualifiedPath.getParent else 
qualifiedPath }.toSet
     }
   }
+
+  private def isDataPath(path: Path): Boolean = {
+    val name = path.getName
+    !(name.startsWith("_") || name.startsWith("."))
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/e03c2519/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
index e193455..133ffed 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
@@ -25,11 +25,13 @@ import scala.collection.mutable.ArrayBuffer
 
 import com.google.common.io.Files
 import org.apache.hadoop.fs.Path
+import org.apache.parquet.hadoop.ParquetOutputFormat
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Literal
 import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation, PartitionDirectory => Partition, PartitioningUtils, 
PartitionSpec}
+import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types._
@@ -890,4 +892,46 @@ class ParquetPartitionDiscoverySuite extends QueryTest 
with ParquetTest with Sha
       }
     }
   }
+
+  test("SPARK-15895 summary files in non-leaf partition directories") {
+    withTempPath { dir =>
+      val path = dir.getCanonicalPath
+
+      withSQLConf(ParquetOutputFormat.ENABLE_JOB_SUMMARY -> "true") {
+        spark.range(3).write.parquet(s"$path/p0=0/p1=0")
+      }
+
+      val p0 = new File(path, "p0=0")
+      val p1 = new File(p0, "p1=0")
+
+      // Builds the following directory layout by:
+      //
+      //  1. copying Parquet summary files we just wrote into `p0=0`, and
+      //  2. touching a dot-file `.dummy` under `p0=0`.
+      //
+      // <base>
+      // +- p0=0
+      //    |- _metadata
+      //    |- _common_metadata
+      //    |- .dummy
+      //    +- p1=0
+      //       |- _metadata
+      //       |- _common_metadata
+      //       |- part-00000.parquet
+      //       |- part-00001.parquet
+      //       +- ...
+      //
+      // The summary files and the dot-file under `p0=0` should not fail 
partition discovery.
+
+      Files.copy(new File(p1, "_metadata"), new File(p0, "_metadata"))
+      Files.copy(new File(p1, "_common_metadata"), new File(p0, 
"_common_metadata"))
+      Files.touch(new File(p0, ".dummy"))
+
+      checkAnswer(spark.read.parquet(s"$path"), Seq(
+        Row(0, 0, 0),
+        Row(1, 0, 0),
+        Row(2, 0, 0)
+      ))
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to