Repository: spark Updated Branches: refs/heads/branch-2.0 515937046 -> e03c25193
[SPARK-15895][SQL] Filters out metadata files while doing partition discovery ## What changes were proposed in this pull request? Take the following directory layout as an example: ``` dir/ +- p0=0/ |-_metadata +- p1=0/ |-part-00001.parquet |-part-00002.parquet |-... ``` The `_metadata` file under `p0=0` shouldn't fail partition discovery. This PR filters output all metadata files whose names start with `_` while doing partition discovery. ## How was this patch tested? New unit test added in `ParquetPartitionDiscoverySuite`. Author: Cheng Lian <l...@databricks.com> Closes #13623 from liancheng/spark-15895-partition-disco-no-metafiles. (cherry picked from commit bd39ffe35c6f939debe5d3c5eb4970b4e62507b0) Signed-off-by: Yin Huai <yh...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e03c2519 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e03c2519 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e03c2519 Branch: refs/heads/branch-2.0 Commit: e03c25193b4a939edbd6e9efc4f46272defd7f0e Parents: 5159370 Author: Cheng Lian <l...@databricks.com> Authored: Tue Jun 14 12:13:12 2016 -0700 Committer: Yin Huai <yh...@databricks.com> Committed: Tue Jun 14 12:13:35 2016 -0700 ---------------------------------------------------------------------- .../datasources/ListingFileCatalog.scala | 7 ++-- .../PartitioningAwareFileCatalog.scala | 15 +++++-- .../ParquetPartitionDiscoverySuite.scala | 44 ++++++++++++++++++++ 3 files changed, 60 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/e03c2519/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala index 7d2854a..d96cf1b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources import scala.collection.mutable import scala.util.Try -import org.apache.hadoop.fs.{FileStatus, LocatedFileStatus, Path} +import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.mapred.{FileInputFormat, JobConf} import org.apache.spark.sql.SparkSession @@ -83,8 +83,9 @@ class ListingFileCatalog( val statuses: Seq[FileStatus] = paths.flatMap { path => val fs = path.getFileSystem(hadoopConf) logInfo(s"Listing $path on driver") - Try(HadoopFsRelation.listLeafFiles(fs, fs.getFileStatus(path), pathFilter)). - getOrElse(Array.empty) + Try { + HadoopFsRelation.listLeafFiles(fs, fs.getFileStatus(path), pathFilter) + }.getOrElse(Array.empty[FileStatus]) } mutable.LinkedHashSet(statuses: _*) } http://git-wip-us.apache.org/repos/asf/spark/blob/e03c2519/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala index 406d2e8..811e96c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala @@ -50,14 +50,14 @@ abstract class PartitioningAwareFileCatalog( override def listFiles(filters: Seq[Expression]): Seq[Partition] = { val selectedPartitions = if (partitionSpec().partitionColumns.isEmpty) { - Partition(InternalRow.empty, allFiles().filterNot(_.getPath.getName startsWith "_")) :: Nil + Partition(InternalRow.empty, allFiles().filter(f => isDataPath(f.getPath))) :: Nil } else { prunePartitions(filters, partitionSpec()).map { case PartitionDirectory(values, path) => val files: Seq[FileStatus] = leafDirToChildrenFiles.get(path) match { case Some(existingDir) => // Directory has children files in it, return them - existingDir.filterNot(_.getPath.getName.startsWith("_")) + existingDir.filter(f => isDataPath(f.getPath)) case None => // Directory does not exist, or has no children files @@ -96,7 +96,11 @@ abstract class PartitioningAwareFileCatalog( protected def inferPartitioning(): PartitionSpec = { // We use leaf dirs containing data files to discover the schema. - val leafDirs = leafDirToChildrenFiles.keys.toSeq + val leafDirs = leafDirToChildrenFiles.filter { case (_, files) => + // SPARK-15895: Metadata files (e.g. Parquet summary files) and temporary files should not be + // counted as data files, so that they shouldn't participate partition discovery. + files.exists(f => isDataPath(f.getPath)) + }.keys.toSeq partitionSchema match { case Some(userProvidedSchema) if userProvidedSchema.nonEmpty => val spec = PartitioningUtils.parsePartitions( @@ -197,4 +201,9 @@ abstract class PartitioningAwareFileCatalog( if (leafFiles.contains(qualifiedPath)) qualifiedPath.getParent else qualifiedPath }.toSet } } + + private def isDataPath(path: Path): Boolean = { + val name = path.getName + !(name.startsWith("_") || name.startsWith(".")) + } } http://git-wip-us.apache.org/repos/asf/spark/blob/e03c2519/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala index e193455..133ffed 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala @@ -25,11 +25,13 @@ import scala.collection.mutable.ArrayBuffer import com.google.common.io.Files import org.apache.hadoop.fs.Path +import org.apache.parquet.hadoop.ParquetOutputFormat import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PartitionDirectory => Partition, PartitioningUtils, PartitionSpec} +import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ @@ -890,4 +892,46 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha } } } + + test("SPARK-15895 summary files in non-leaf partition directories") { + withTempPath { dir => + val path = dir.getCanonicalPath + + withSQLConf(ParquetOutputFormat.ENABLE_JOB_SUMMARY -> "true") { + spark.range(3).write.parquet(s"$path/p0=0/p1=0") + } + + val p0 = new File(path, "p0=0") + val p1 = new File(p0, "p1=0") + + // Builds the following directory layout by: + // + // 1. copying Parquet summary files we just wrote into `p0=0`, and + // 2. touching a dot-file `.dummy` under `p0=0`. + // + // <base> + // +- p0=0 + // |- _metadata + // |- _common_metadata + // |- .dummy + // +- p1=0 + // |- _metadata + // |- _common_metadata + // |- part-00000.parquet + // |- part-00001.parquet + // +- ... + // + // The summary files and the dot-file under `p0=0` should not fail partition discovery. + + Files.copy(new File(p1, "_metadata"), new File(p0, "_metadata")) + Files.copy(new File(p1, "_common_metadata"), new File(p0, "_common_metadata")) + Files.touch(new File(p0, ".dummy")) + + checkAnswer(spark.read.parquet(s"$path"), Seq( + Row(0, 0, 0), + Row(1, 0, 0), + Row(2, 0, 0) + )) + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org