Repository: spark
Updated Branches:
  refs/heads/branch-2.2 42cc83082 -> 9bdc83590


[SPARK-21085][SQL] Failed to read the partitioned table created by Spark 2.1

### What changes were proposed in this pull request?
Before the PR, Spark is unable to read the partitioned table created by Spark 
2.1 when the table schema does not put the partitioning column at the end of 
the schema.
[assert(partitionFields.map(_.name) == 
partitionColumnNames)](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala#L234-L236)

When reading the table metadata from the metastore, we also need to reorder the 
columns.

### How was this patch tested?
Added test cases to check both Hive-serde and data source tables.

Author: gatorsmile <gatorsm...@gmail.com>

Closes #18295 from gatorsmile/reorderReadSchema.

(cherry picked from commit 0c88e8d37224713199ca5661c2cd57f5918dcb9a)
Signed-off-by: Wenchen Fan <wenc...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9bdc8359
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9bdc8359
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9bdc8359

Branch: refs/heads/branch-2.2
Commit: 9bdc83590922e0e1f22424904411acb0d1b37a11
Parents: 42cc830
Author: gatorsmile <gatorsm...@gmail.com>
Authored: Wed Jun 14 16:28:06 2017 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Wed Jun 14 16:28:21 2017 +0800

----------------------------------------------------------------------
 .../spark/sql/hive/HiveExternalCatalog.scala    | 31 ++++++++++++++++----
 .../sql/hive/HiveExternalCatalogSuite.scala     | 26 ++++++++++++++++
 2 files changed, 52 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9bdc8359/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index ba48fac..a03beb7 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -717,6 +717,20 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
       properties = table.properties.filterNot { case (key, _) => 
key.startsWith(SPARK_SQL_PREFIX) })
   }
 
+  // Reorder table schema to put partition columns at the end. Before Spark 
2.2, the partition
+  // columns are not put at the end of schema. We need to reorder it when 
reading the schema
+  // from the table properties.
+  private def reorderSchema(schema: StructType, partColumnNames: Seq[String]): 
StructType = {
+    val partitionFields = partColumnNames.map { partCol =>
+      schema.find(_.name == partCol).getOrElse {
+        throw new AnalysisException("The metadata is corrupted. Unable to find 
the " +
+          s"partition column names from the schema. schema: 
${schema.catalogString}. " +
+          s"Partition columns: ${partColumnNames.mkString("[", ", ", "]")}")
+      }
+    }
+    StructType(schema.filterNot(partitionFields.contains) ++ partitionFields)
+  }
+
   private def restoreHiveSerdeTable(table: CatalogTable): CatalogTable = {
     val hiveTable = table.copy(
       provider = Some(DDLUtils.HIVE_PROVIDER),
@@ -726,10 +740,13 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
     // schema from table properties.
     if (table.properties.contains(DATASOURCE_SCHEMA_NUMPARTS)) {
       val schemaFromTableProps = getSchemaFromTableProperties(table)
-      if (DataType.equalsIgnoreCaseAndNullability(schemaFromTableProps, 
table.schema)) {
+      val partColumnNames = getPartitionColumnsFromTableProperties(table)
+      val reorderedSchema = reorderSchema(schema = schemaFromTableProps, 
partColumnNames)
+
+      if (DataType.equalsIgnoreCaseAndNullability(reorderedSchema, 
table.schema)) {
         hiveTable.copy(
-          schema = schemaFromTableProps,
-          partitionColumnNames = getPartitionColumnsFromTableProperties(table),
+          schema = reorderedSchema,
+          partitionColumnNames = partColumnNames,
           bucketSpec = getBucketSpecFromTableProperties(table))
       } else {
         // Hive metastore may change the table schema, e.g. schema inference. 
If the table
@@ -759,11 +776,15 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
     }
     val partitionProvider = table.properties.get(TABLE_PARTITION_PROVIDER)
 
+    val schemaFromTableProps = getSchemaFromTableProperties(table)
+    val partColumnNames = getPartitionColumnsFromTableProperties(table)
+    val reorderedSchema = reorderSchema(schema = schemaFromTableProps, 
partColumnNames)
+
     table.copy(
       provider = Some(provider),
       storage = storageWithLocation,
-      schema = getSchemaFromTableProperties(table),
-      partitionColumnNames = getPartitionColumnsFromTableProperties(table),
+      schema = reorderedSchema,
+      partitionColumnNames = partColumnNames,
       bucketSpec = getBucketSpecFromTableProperties(table),
       tracksPartitionsInCatalog = partitionProvider == 
Some(TABLE_PARTITION_PROVIDER_CATALOG))
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/9bdc8359/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
index bd54c04..d43534d 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
@@ -63,4 +63,30 @@ class HiveExternalCatalogSuite extends ExternalCatalogSuite {
     
assert(!rawTable.properties.contains(HiveExternalCatalog.DATASOURCE_PROVIDER))
     assert(DDLUtils.isHiveTable(externalCatalog.getTable("db1", "hive_tbl")))
   }
+
+  Seq("parquet", "hive").foreach { format =>
+    test(s"Partition columns should be put at the end of table schema for the 
format $format") {
+      val catalog = newBasicCatalog()
+      val newSchema = new StructType()
+        .add("col1", "int")
+        .add("col2", "string")
+        .add("partCol1", "int")
+        .add("partCol2", "string")
+      val table = CatalogTable(
+        identifier = TableIdentifier("tbl", Some("db1")),
+        tableType = CatalogTableType.MANAGED,
+        storage = CatalogStorageFormat.empty,
+        schema = new StructType()
+          .add("col1", "int")
+          .add("partCol1", "int")
+          .add("partCol2", "string")
+          .add("col2", "string"),
+        provider = Some(format),
+        partitionColumnNames = Seq("partCol1", "partCol2"))
+      catalog.createTable(table, ignoreIfExists = false)
+
+      val restoredTable = externalCatalog.getTable("db1", "tbl")
+      assert(restoredTable.schema == newSchema)
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to