Repository: spark
Updated Branches:
  refs/heads/master 89e67d666 -> 81c68eceb


[SPARK-15248][SQL] Make MetastoreFileCatalog consider directories from 
partition specs of a partitioned metastore table

Table partitions can be added with locations different from default warehouse 
location of a hive table.
`CREATE TABLE parquetTable (a int) PARTITIONED BY (b int) STORED AS parquet `
`ALTER TABLE parquetTable ADD PARTITION (b=1) LOCATION '/partition'`
Querying such a table throws error as the MetastoreFileCatalog does not list 
the added partition directory, it only lists the default base location.

```
[info] - SPARK-15248: explicitly added partitions should be readable *** FAILED 
*** (1 second, 8 milliseconds)
[info]   java.util.NoSuchElementException: key not found: 
file:/Users/tdas/Projects/Spark/spark2/target/tmp/spark-b39ad224-c5d1-4966-8981-fb45a2066d61/partition
[info]   at scala.collection.MapLike$class.default(MapLike.scala:228)
[info]   at scala.collection.AbstractMap.default(Map.scala:59)
[info]   at scala.collection.MapLike$class.apply(MapLike.scala:141)
[info]   at scala.collection.AbstractMap.apply(Map.scala:59)
[info]   at 
org.apache.spark.sql.execution.datasources.PartitioningAwareFileCatalog$$anonfun$listFiles$1.apply(PartitioningAwareFileCatalog.scala:59)
[info]   at 
org.apache.spark.sql.execution.datasources.PartitioningAwareFileCatalog$$anonfun$listFiles$1.apply(PartitioningAwareFileCatalog.scala:55)
[info]   at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
[info]   at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
[info]   at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
[info]   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
[info]   at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
[info]   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
[info]   at 
org.apache.spark.sql.execution.datasources.PartitioningAwareFileCatalog.listFiles(PartitioningAwareFileCatalog.scala:55)
[info]   at 
org.apache.spark.sql.execution.datasources.FileSourceStrategy$.apply(FileSourceStrategy.scala:93)
[info]   at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:59)
[info]   at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:59)
[info]   at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
[info]   at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
[info]   at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:60)
[info]   at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:55)
[info]   at 
org.apache.spark.sql.execution.SparkStrategies$SpecialLimits$.apply(SparkStrategies.scala:55)
[info]   at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:59)
[info]   at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:59)
[info]   at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
[info]   at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
[info]   at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:60)
[info]   at 
org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:77)
[info]   at 
org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:75)
[info]   at 
org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:82)
[info]   at 
org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:82)
[info]   at 
org.apache.spark.sql.QueryTest.assertEmptyMissingInput(QueryTest.scala:330)
[info]   at org.apache.spark.sql.QueryTest.checkAnswer(QueryTest.scala:146)
[info]   at org.apache.spark.sql.QueryTest.checkAnswer(QueryTest.scala:159)
[info]   at 
org.apache.spark.sql.hive.ParquetMetastoreSuite$$anonfun$12$$anonfun$apply$mcV$sp$7$$anonfun$apply$mcV$sp$25.apply(parquetSuites.scala:554)
[info]   at 
org.apache.spark.sql.hive.ParquetMetastoreSuite$$anonfun$12$$anonfun$apply$mcV$sp$7$$anonfun$apply$mcV$sp$25.apply(parquetSuites.scala:535)
[info]   at 
org.apache.spark.sql.test.SQLTestUtils$class.withTempDir(SQLTestUtils.scala:125)
[info]   at 
org.apache.spark.sql.hive.ParquetPartitioningTest.withTempDir(parquetSuites.scala:726)
[info]   at 
org.apache.spark.sql.hive.ParquetMetastoreSuite$$anonfun$12$$anonfun$apply$mcV$sp$7.apply$mcV$sp(parquetSuites.scala:535)
[info]   at 
org.apache.spark.sql.test.SQLTestUtils$class.withTable(SQLTestUtils.scala:166)
[info]   at 
org.apache.spark.sql.hive.ParquetPartitioningTest.withTable(parquetSuites.scala:726)
[info]   at 
org.apache.spark.sql.hive.ParquetMetastoreSuite$$anonfun$12.apply$mcV$sp(parquetSuites.scala:534)
[info]   at 
org.apache.spark.sql.hive.ParquetMetastoreSuite$$anonfun$12.apply(parquetSuites.scala:534)
[info]   at 
org.apache.spark.sql.hive.ParquetMetastoreSuite$$anonfun$12.apply(parquetSuites.scala:534)
```

The solution in this PR to get the paths to list from the partition spec and 
not rely on the default table path alone.

unit tests.

Author: Tathagata Das <tathagata.das1...@gmail.com>

Closes #13022 from tdas/SPARK-15248.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/81c68ece
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/81c68ece
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/81c68ece

Branch: refs/heads/master
Commit: 81c68eceba3a857ba7349c6892dc336c3ebd11dc
Parents: 89e67d6
Author: Tathagata Das <tathagata.das1...@gmail.com>
Authored: Wed May 11 12:35:41 2016 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Wed May 11 12:36:25 2016 -0700

----------------------------------------------------------------------
 .../PartitioningAwareFileCatalog.scala          | 13 ++++++--
 .../spark/sql/hive/HiveMetastoreCatalog.scala   | 31 +++++++++++++-----
 .../apache/spark/sql/hive/parquetSuites.scala   | 34 ++++++++++++++++++++
 3 files changed, 67 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/81c68ece/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
index 27f23c8..e0e4ddc 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
@@ -54,9 +54,16 @@ abstract class PartitioningAwareFileCatalog(
     } else {
       prunePartitions(filters, partitionSpec()).map {
         case PartitionDirectory(values, path) =>
-          Partition(
-            values,
-            leafDirToChildrenFiles(path).filterNot(_.getPath.getName 
startsWith "_"))
+          val files: Seq[FileStatus] = leafDirToChildrenFiles.get(path) match {
+            case Some(existingDir) =>
+              // Directory has children files in it, return them
+              existingDir.filterNot(_.getPath.getName.startsWith("_"))
+
+            case None =>
+              // Directory does not exist, or has no children files
+              Nil
+          }
+          Partition(values, files)
       }
     }
     logTrace("Selected files after partition pruning:\n\t" + 
selectedPartitions.mkString("\n\t"))

http://git-wip-us.apache.org/repos/asf/spark/blob/81c68ece/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 607f0a1..b0a3a80 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -271,8 +271,10 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
         Some(partitionSpec))
 
       val hadoopFsRelation = cached.getOrElse {
-        val paths = new 
Path(metastoreRelation.catalogTable.storage.locationUri.get) :: Nil
-        val fileCatalog = new MetaStoreFileCatalog(sparkSession, paths, 
partitionSpec)
+        val fileCatalog = new MetaStorePartitionedTableFileCatalog(
+          sparkSession,
+          new Path(metastoreRelation.catalogTable.storage.locationUri.get),
+          partitionSpec)
 
         val inferredSchema = if (fileType.equals("parquet")) {
           val inferredSchema =
@@ -537,18 +539,31 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
 /**
  * An override of the standard HDFS listing based catalog, that overrides the 
partition spec with
  * the information from the metastore.
+ * @param tableBasePath The default base path of the Hive metastore table
+ * @param partitionSpec The partition specifications from Hive metastore
  */
-private[hive] class MetaStoreFileCatalog(
+private[hive] class MetaStorePartitionedTableFileCatalog(
     sparkSession: SparkSession,
-    paths: Seq[Path],
-    partitionSpecFromHive: PartitionSpec)
+    tableBasePath: Path,
+    override val partitionSpec: PartitionSpec)
   extends ListingFileCatalog(
     sparkSession,
-    paths,
+    MetaStorePartitionedTableFileCatalog.getPaths(tableBasePath, 
partitionSpec),
     Map.empty,
-    Some(partitionSpecFromHive.partitionColumns)) {
+    Some(partitionSpec.partitionColumns)) {
+}
 
-  override def partitionSpec(): PartitionSpec = partitionSpecFromHive
+private[hive] object MetaStorePartitionedTableFileCatalog {
+  /** Get the list of paths to list files in the for a metastore table */
+  def getPaths(tableBasePath: Path, partitionSpec: PartitionSpec): Seq[Path] = 
{
+    // If there are no partitions currently specified then use base path,
+    // otherwise use the paths corresponding to the partitions.
+    if (partitionSpec.partitions.isEmpty) {
+      Seq(tableBasePath)
+    } else {
+      partitionSpec.partitions.map(_.path)
+    }
+  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/81c68ece/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index 6e93bbd..f52c6e4 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -529,6 +529,40 @@ class ParquetMetastoreSuite extends 
ParquetPartitioningTest {
 
     dropTables("test_insert_parquet", "test_parquet_partitioned_cache_test")
   }
+
+  test("SPARK-15248: explicitly added partitions should be readable") {
+    withTable("test_added_partitions", "test_temp") {
+      withTempDir { src =>
+        val partitionDir = new File(src, "partition").getCanonicalPath
+        sql(
+          """
+            |CREATE TABLE test_added_partitions (a STRING)
+            |PARTITIONED BY (b INT)
+            |STORED AS PARQUET
+          """.stripMargin)
+
+        // Temp table to insert data into partitioned table
+        Seq("foo", "bar").toDF("a").registerTempTable("test_temp")
+        sql("INSERT INTO test_added_partitions PARTITION(b='0') SELECT a FROM 
test_temp")
+
+        checkAnswer(
+          sql("SELECT * FROM test_added_partitions"),
+          Seq(("foo", 0), ("bar", 0)).toDF("a", "b"))
+
+        // Create partition without data files and check whether it can be read
+        sql(s"ALTER TABLE test_added_partitions ADD PARTITION (b='1') LOCATION 
'$partitionDir'")
+        checkAnswer(
+          sql("SELECT * FROM test_added_partitions"),
+          Seq(("foo", 0), ("bar", 0)).toDF("a", "b"))
+
+        // Add data files to partition directory and check whether they can be 
read
+        
Seq("baz").toDF("a").write.mode(SaveMode.Overwrite).parquet(partitionDir)
+        checkAnswer(
+          sql("SELECT * FROM test_added_partitions"),
+          Seq(("foo", 0), ("bar", 0), ("baz", 1)).toDF("a", "b"))
+      }
+    }
+  }
 }
 
 /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to