spark git commit: [SPARK-21085][SQL] Failed to read the partitioned table created by Spark 2.1

2017-06-14 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 42cc83082 -> 9bdc83590


[SPARK-21085][SQL] Failed to read the partitioned table created by Spark 2.1

### What changes were proposed in this pull request?
Before the PR, Spark is unable to read the partitioned table created by Spark 
2.1 when the table schema does not put the partitioning column at the end of 
the schema.
[assert(partitionFields.map(_.name) == 
partitionColumnNames)](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala#L234-L236)

When reading the table metadata from the metastore, we also need to reorder the 
columns.

### How was this patch tested?
Added test cases to check both Hive-serde and data source tables.

Author: gatorsmile 

Closes #18295 from gatorsmile/reorderReadSchema.

(cherry picked from commit 0c88e8d37224713199ca5661c2cd57f5918dcb9a)
Signed-off-by: Wenchen Fan 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9bdc8359
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9bdc8359
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9bdc8359

Branch: refs/heads/branch-2.2
Commit: 9bdc83590922e0e1f22424904411acb0d1b37a11
Parents: 42cc830
Author: gatorsmile 
Authored: Wed Jun 14 16:28:06 2017 +0800
Committer: Wenchen Fan 
Committed: Wed Jun 14 16:28:21 2017 +0800

--
 .../spark/sql/hive/HiveExternalCatalog.scala| 31 
 .../sql/hive/HiveExternalCatalogSuite.scala | 26 
 2 files changed, 52 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/9bdc8359/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index ba48fac..a03beb7 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -717,6 +717,20 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
   properties = table.properties.filterNot { case (key, _) => 
key.startsWith(SPARK_SQL_PREFIX) })
   }
 
+  // Reorder table schema to put partition columns at the end. Before Spark 
2.2, the partition
+  // columns are not put at the end of schema. We need to reorder it when 
reading the schema
+  // from the table properties.
+  private def reorderSchema(schema: StructType, partColumnNames: Seq[String]): 
StructType = {
+val partitionFields = partColumnNames.map { partCol =>
+  schema.find(_.name == partCol).getOrElse {
+throw new AnalysisException("The metadata is corrupted. Unable to find 
the " +
+  s"partition column names from the schema. schema: 
${schema.catalogString}. " +
+  s"Partition columns: ${partColumnNames.mkString("[", ", ", "]")}")
+  }
+}
+StructType(schema.filterNot(partitionFields.contains) ++ partitionFields)
+  }
+
   private def restoreHiveSerdeTable(table: CatalogTable): CatalogTable = {
 val hiveTable = table.copy(
   provider = Some(DDLUtils.HIVE_PROVIDER),
@@ -726,10 +740,13 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
 // schema from table properties.
 if (table.properties.contains(DATASOURCE_SCHEMA_NUMPARTS)) {
   val schemaFromTableProps = getSchemaFromTableProperties(table)
-  if (DataType.equalsIgnoreCaseAndNullability(schemaFromTableProps, 
table.schema)) {
+  val partColumnNames = getPartitionColumnsFromTableProperties(table)
+  val reorderedSchema = reorderSchema(schema = schemaFromTableProps, 
partColumnNames)
+
+  if (DataType.equalsIgnoreCaseAndNullability(reorderedSchema, 
table.schema)) {
 hiveTable.copy(
-  schema = schemaFromTableProps,
-  partitionColumnNames = getPartitionColumnsFromTableProperties(table),
+  schema = reorderedSchema,
+  partitionColumnNames = partColumnNames,
   bucketSpec = getBucketSpecFromTableProperties(table))
   } else {
 // Hive metastore may change the table schema, e.g. schema inference. 
If the table
@@ -759,11 +776,15 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
 }
 val partitionProvider = table.properties.get(TABLE_PARTITION_PROVIDER)
 
+val schemaFromTableProps = getSchemaFromTableProperties(table)
+val partColumnNames = getPartitionColumnsFromTableProperties(table)
+val reorderedSchema = 

spark git commit: [SPARK-21085][SQL] Failed to read the partitioned table created by Spark 2.1

2017-06-14 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master d6f76eb34 -> 0c88e8d37


[SPARK-21085][SQL] Failed to read the partitioned table created by Spark 2.1

### What changes were proposed in this pull request?
Before the PR, Spark is unable to read the partitioned table created by Spark 
2.1 when the table schema does not put the partitioning column at the end of 
the schema.
[assert(partitionFields.map(_.name) == 
partitionColumnNames)](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala#L234-L236)

When reading the table metadata from the metastore, we also need to reorder the 
columns.

### How was this patch tested?
Added test cases to check both Hive-serde and data source tables.

Author: gatorsmile 

Closes #18295 from gatorsmile/reorderReadSchema.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0c88e8d3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0c88e8d3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0c88e8d3

Branch: refs/heads/master
Commit: 0c88e8d37224713199ca5661c2cd57f5918dcb9a
Parents: d6f76eb
Author: gatorsmile 
Authored: Wed Jun 14 16:28:06 2017 +0800
Committer: Wenchen Fan 
Committed: Wed Jun 14 16:28:06 2017 +0800

--
 .../spark/sql/hive/HiveExternalCatalog.scala| 31 
 .../sql/hive/HiveExternalCatalogSuite.scala | 26 
 2 files changed, 52 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0c88e8d3/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 7fcf06d..1945367 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -729,6 +729,20 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
   properties = table.properties.filterNot { case (key, _) => 
key.startsWith(SPARK_SQL_PREFIX) })
   }
 
+  // Reorder table schema to put partition columns at the end. Before Spark 
2.2, the partition
+  // columns are not put at the end of schema. We need to reorder it when 
reading the schema
+  // from the table properties.
+  private def reorderSchema(schema: StructType, partColumnNames: Seq[String]): 
StructType = {
+val partitionFields = partColumnNames.map { partCol =>
+  schema.find(_.name == partCol).getOrElse {
+throw new AnalysisException("The metadata is corrupted. Unable to find 
the " +
+  s"partition column names from the schema. schema: 
${schema.catalogString}. " +
+  s"Partition columns: ${partColumnNames.mkString("[", ", ", "]")}")
+  }
+}
+StructType(schema.filterNot(partitionFields.contains) ++ partitionFields)
+  }
+
   private def restoreHiveSerdeTable(table: CatalogTable): CatalogTable = {
 val hiveTable = table.copy(
   provider = Some(DDLUtils.HIVE_PROVIDER),
@@ -738,10 +752,13 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
 // schema from table properties.
 if (table.properties.contains(DATASOURCE_SCHEMA_NUMPARTS)) {
   val schemaFromTableProps = getSchemaFromTableProperties(table)
-  if (DataType.equalsIgnoreCaseAndNullability(schemaFromTableProps, 
table.schema)) {
+  val partColumnNames = getPartitionColumnsFromTableProperties(table)
+  val reorderedSchema = reorderSchema(schema = schemaFromTableProps, 
partColumnNames)
+
+  if (DataType.equalsIgnoreCaseAndNullability(reorderedSchema, 
table.schema)) {
 hiveTable.copy(
-  schema = schemaFromTableProps,
-  partitionColumnNames = getPartitionColumnsFromTableProperties(table),
+  schema = reorderedSchema,
+  partitionColumnNames = partColumnNames,
   bucketSpec = getBucketSpecFromTableProperties(table))
   } else {
 // Hive metastore may change the table schema, e.g. schema inference. 
If the table
@@ -771,11 +788,15 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
 }
 val partitionProvider = table.properties.get(TABLE_PARTITION_PROVIDER)
 
+val schemaFromTableProps = getSchemaFromTableProperties(table)
+val partColumnNames = getPartitionColumnsFromTableProperties(table)
+val reorderedSchema = reorderSchema(schema = schemaFromTableProps, 
partColumnNames)
+
 table.copy(
   provider = Some(provider),
   storage =