spark git commit: [SPARK-21085][SQL] Failed to read the partitioned table created by Spark 2.1
Repository: spark Updated Branches: refs/heads/branch-2.2 42cc83082 -> 9bdc83590 [SPARK-21085][SQL] Failed to read the partitioned table created by Spark 2.1 ### What changes were proposed in this pull request? Before the PR, Spark is unable to read the partitioned table created by Spark 2.1 when the table schema does not put the partitioning column at the end of the schema. [assert(partitionFields.map(_.name) == partitionColumnNames)](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala#L234-L236) When reading the table metadata from the metastore, we also need to reorder the columns. ### How was this patch tested? Added test cases to check both Hive-serde and data source tables. Author: gatorsmileCloses #18295 from gatorsmile/reorderReadSchema. (cherry picked from commit 0c88e8d37224713199ca5661c2cd57f5918dcb9a) Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9bdc8359 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9bdc8359 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9bdc8359 Branch: refs/heads/branch-2.2 Commit: 9bdc83590922e0e1f22424904411acb0d1b37a11 Parents: 42cc830 Author: gatorsmile Authored: Wed Jun 14 16:28:06 2017 +0800 Committer: Wenchen Fan Committed: Wed Jun 14 16:28:21 2017 +0800 -- .../spark/sql/hive/HiveExternalCatalog.scala| 31 .../sql/hive/HiveExternalCatalogSuite.scala | 26 2 files changed, 52 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9bdc8359/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index ba48fac..a03beb7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -717,6 +717,20 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat properties = table.properties.filterNot { case (key, _) => key.startsWith(SPARK_SQL_PREFIX) }) } + // Reorder table schema to put partition columns at the end. Before Spark 2.2, the partition + // columns are not put at the end of schema. We need to reorder it when reading the schema + // from the table properties. + private def reorderSchema(schema: StructType, partColumnNames: Seq[String]): StructType = { +val partitionFields = partColumnNames.map { partCol => + schema.find(_.name == partCol).getOrElse { +throw new AnalysisException("The metadata is corrupted. Unable to find the " + + s"partition column names from the schema. schema: ${schema.catalogString}. " + + s"Partition columns: ${partColumnNames.mkString("[", ", ", "]")}") + } +} +StructType(schema.filterNot(partitionFields.contains) ++ partitionFields) + } + private def restoreHiveSerdeTable(table: CatalogTable): CatalogTable = { val hiveTable = table.copy( provider = Some(DDLUtils.HIVE_PROVIDER), @@ -726,10 +740,13 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // schema from table properties. if (table.properties.contains(DATASOURCE_SCHEMA_NUMPARTS)) { val schemaFromTableProps = getSchemaFromTableProperties(table) - if (DataType.equalsIgnoreCaseAndNullability(schemaFromTableProps, table.schema)) { + val partColumnNames = getPartitionColumnsFromTableProperties(table) + val reorderedSchema = reorderSchema(schema = schemaFromTableProps, partColumnNames) + + if (DataType.equalsIgnoreCaseAndNullability(reorderedSchema, table.schema)) { hiveTable.copy( - schema = schemaFromTableProps, - partitionColumnNames = getPartitionColumnsFromTableProperties(table), + schema = reorderedSchema, + partitionColumnNames = partColumnNames, bucketSpec = getBucketSpecFromTableProperties(table)) } else { // Hive metastore may change the table schema, e.g. schema inference. If the table @@ -759,11 +776,15 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat } val partitionProvider = table.properties.get(TABLE_PARTITION_PROVIDER) +val schemaFromTableProps = getSchemaFromTableProperties(table) +val partColumnNames = getPartitionColumnsFromTableProperties(table) +val reorderedSchema =
spark git commit: [SPARK-21085][SQL] Failed to read the partitioned table created by Spark 2.1
Repository: spark Updated Branches: refs/heads/master d6f76eb34 -> 0c88e8d37 [SPARK-21085][SQL] Failed to read the partitioned table created by Spark 2.1 ### What changes were proposed in this pull request? Before the PR, Spark is unable to read the partitioned table created by Spark 2.1 when the table schema does not put the partitioning column at the end of the schema. [assert(partitionFields.map(_.name) == partitionColumnNames)](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala#L234-L236) When reading the table metadata from the metastore, we also need to reorder the columns. ### How was this patch tested? Added test cases to check both Hive-serde and data source tables. Author: gatorsmileCloses #18295 from gatorsmile/reorderReadSchema. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0c88e8d3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0c88e8d3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0c88e8d3 Branch: refs/heads/master Commit: 0c88e8d37224713199ca5661c2cd57f5918dcb9a Parents: d6f76eb Author: gatorsmile Authored: Wed Jun 14 16:28:06 2017 +0800 Committer: Wenchen Fan Committed: Wed Jun 14 16:28:06 2017 +0800 -- .../spark/sql/hive/HiveExternalCatalog.scala| 31 .../sql/hive/HiveExternalCatalogSuite.scala | 26 2 files changed, 52 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0c88e8d3/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 7fcf06d..1945367 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -729,6 +729,20 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat properties = table.properties.filterNot { case (key, _) => key.startsWith(SPARK_SQL_PREFIX) }) } + // Reorder table schema to put partition columns at the end. Before Spark 2.2, the partition + // columns are not put at the end of schema. We need to reorder it when reading the schema + // from the table properties. + private def reorderSchema(schema: StructType, partColumnNames: Seq[String]): StructType = { +val partitionFields = partColumnNames.map { partCol => + schema.find(_.name == partCol).getOrElse { +throw new AnalysisException("The metadata is corrupted. Unable to find the " + + s"partition column names from the schema. schema: ${schema.catalogString}. " + + s"Partition columns: ${partColumnNames.mkString("[", ", ", "]")}") + } +} +StructType(schema.filterNot(partitionFields.contains) ++ partitionFields) + } + private def restoreHiveSerdeTable(table: CatalogTable): CatalogTable = { val hiveTable = table.copy( provider = Some(DDLUtils.HIVE_PROVIDER), @@ -738,10 +752,13 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // schema from table properties. if (table.properties.contains(DATASOURCE_SCHEMA_NUMPARTS)) { val schemaFromTableProps = getSchemaFromTableProperties(table) - if (DataType.equalsIgnoreCaseAndNullability(schemaFromTableProps, table.schema)) { + val partColumnNames = getPartitionColumnsFromTableProperties(table) + val reorderedSchema = reorderSchema(schema = schemaFromTableProps, partColumnNames) + + if (DataType.equalsIgnoreCaseAndNullability(reorderedSchema, table.schema)) { hiveTable.copy( - schema = schemaFromTableProps, - partitionColumnNames = getPartitionColumnsFromTableProperties(table), + schema = reorderedSchema, + partitionColumnNames = partColumnNames, bucketSpec = getBucketSpecFromTableProperties(table)) } else { // Hive metastore may change the table schema, e.g. schema inference. If the table @@ -771,11 +788,15 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat } val partitionProvider = table.properties.get(TABLE_PARTITION_PROVIDER) +val schemaFromTableProps = getSchemaFromTableProperties(table) +val partColumnNames = getPartitionColumnsFromTableProperties(table) +val reorderedSchema = reorderSchema(schema = schemaFromTableProps, partColumnNames) + table.copy( provider = Some(provider), storage =