Repository: spark Updated Branches: refs/heads/master 43d71d965 -> 01a8e4627
[SPARK-21769][SQL] Add a table-specific option for always respecting schemas inferred/controlled by Spark SQL ## What changes were proposed in this pull request? For Hive-serde tables, we always respect the schema stored in Hive metastore, because the schema could be altered by the other engines that share the same metastore. Thus, we always trust the metastore-controlled schema for Hive-serde tables when the schemas are different (without considering the nullability and cases). However, in some scenarios, Hive metastore also could INCORRECTLY overwrite the schemas when the serde and Hive metastore built-in serde are different. The proposed solution is to introduce a table-specific option for such scenarios. For a specific table, users can make Spark always respect Spark-inferred/controlled schema instead of trusting metastore-controlled schema. By default, we trust Hive metastore-controlled schema. ## How was this patch tested? Added a cross-version test case Author: gatorsmile <gatorsm...@gmail.com> Closes #19003 from gatorsmile/respectSparkSchema. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/01a8e462 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/01a8e462 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/01a8e462 Branch: refs/heads/master Commit: 01a8e46278dbfde916a74b6fd51e08804602e1cf Parents: 43d71d9 Author: gatorsmile <gatorsm...@gmail.com> Authored: Tue Aug 22 13:12:59 2017 -0700 Committer: Sameer Agarwal <samee...@apache.org> Committed: Tue Aug 22 13:12:59 2017 -0700 ---------------------------------------------------------------------- .../execution/datasources/SourceOptions.scala | 50 +++++++++++++++++++ .../spark/sql/hive/HiveExternalCatalog.scala | 11 ++-- .../src/test/resources/avroDecimal/decimal.avro | Bin 0 -> 203 bytes .../spark/sql/hive/client/VersionsSuite.scala | 41 +++++++++++++++ 4 files changed, 97 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/01a8e462/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SourceOptions.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SourceOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SourceOptions.scala new file mode 100644 index 0000000..c98c0b2 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SourceOptions.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap + +/** + * Options for the data source. + */ +class SourceOptions( + @transient private val parameters: CaseInsensitiveMap[String]) + extends Serializable { + import SourceOptions._ + + def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters)) + + // A flag to disable saving a data source table's metadata in hive compatible way. + val skipHiveMetadata: Boolean = parameters + .get(SKIP_HIVE_METADATA).map(_.toBoolean).getOrElse(DEFAULT_SKIP_HIVE_METADATA) + + // A flag to always respect the Spark schema restored from the table properties + val respectSparkSchema: Boolean = parameters + .get(RESPECT_SPARK_SCHEMA).map(_.toBoolean).getOrElse(DEFAULT_RESPECT_SPARK_SCHEMA) +} + + +object SourceOptions { + + val SKIP_HIVE_METADATA = "skipHiveMetadata" + val DEFAULT_SKIP_HIVE_METADATA = false + + val RESPECT_SPARK_SCHEMA = "respectSparkSchema" + val DEFAULT_RESPECT_SPARK_SCHEMA = false + +} http://git-wip-us.apache.org/repos/asf/spark/blob/01a8e462/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index bdbb8bc..34af37c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -41,7 +41,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical.ColumnStat import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.execution.command.DDLUtils -import org.apache.spark.sql.execution.datasources.PartitioningUtils +import org.apache.spark.sql.execution.datasources.{PartitioningUtils, SourceOptions} import org.apache.spark.sql.hive.client.HiveClient import org.apache.spark.sql.internal.HiveSerDe import org.apache.spark.sql.internal.StaticSQLConf._ @@ -260,6 +260,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat private def createDataSourceTable(table: CatalogTable, ignoreIfExists: Boolean): Unit = { // data source table always have a provider, it's guaranteed by `DDLUtils.isDatasourceTable`. val provider = table.provider.get + val options = new SourceOptions(table.storage.properties) // To work around some hive metastore issues, e.g. not case-preserving, bad decimal type // support, no column nullability, etc., we should do some extra works before saving table @@ -325,11 +326,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat val qualifiedTableName = table.identifier.quotedString val maybeSerde = HiveSerDe.sourceToSerDe(provider) - val skipHiveMetadata = table.storage.properties - .getOrElse("skipHiveMetadata", "false").toBoolean val (hiveCompatibleTable, logMessage) = maybeSerde match { - case _ if skipHiveMetadata => + case _ if options.skipHiveMetadata => val message = s"Persisting data source table $qualifiedTableName into Hive metastore in" + "Spark SQL specific format, which is NOT compatible with Hive." @@ -737,6 +736,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat } private def restoreHiveSerdeTable(table: CatalogTable): CatalogTable = { + val options = new SourceOptions(table.storage.properties) val hiveTable = table.copy( provider = Some(DDLUtils.HIVE_PROVIDER), tracksPartitionsInCatalog = true) @@ -748,7 +748,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat val partColumnNames = getPartitionColumnsFromTableProperties(table) val reorderedSchema = reorderSchema(schema = schemaFromTableProps, partColumnNames) - if (DataType.equalsIgnoreCaseAndNullability(reorderedSchema, table.schema)) { + if (DataType.equalsIgnoreCaseAndNullability(reorderedSchema, table.schema) || + options.respectSparkSchema) { hiveTable.copy( schema = reorderedSchema, partitionColumnNames = partColumnNames, http://git-wip-us.apache.org/repos/asf/spark/blob/01a8e462/sql/hive/src/test/resources/avroDecimal/decimal.avro ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/resources/avroDecimal/decimal.avro b/sql/hive/src/test/resources/avroDecimal/decimal.avro new file mode 100755 index 0000000..6da423f Binary files /dev/null and b/sql/hive/src/test/resources/avroDecimal/decimal.avro differ http://git-wip-us.apache.org/repos/asf/spark/blob/01a8e462/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index 072e538..cbbe869 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -763,6 +763,47 @@ class VersionsSuite extends SparkFunSuite with Logging { } } + test(s"$version: read avro file containing decimal") { + val url = Thread.currentThread().getContextClassLoader.getResource("avroDecimal") + val location = new File(url.getFile) + + val tableName = "tab1" + val avroSchema = + """{ + | "name": "test_record", + | "type": "record", + | "fields": [ { + | "name": "f0", + | "type": [ + | "null", + | { + | "precision": 38, + | "scale": 2, + | "type": "bytes", + | "logicalType": "decimal" + | } + | ] + | } ] + |} + """.stripMargin + withTable(tableName) { + versionSpark.sql( + s""" + |CREATE TABLE $tableName + |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' + |WITH SERDEPROPERTIES ('respectSparkSchema' = 'true') + |STORED AS + | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' + | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' + |LOCATION '$location' + |TBLPROPERTIES ('avro.schema.literal' = '$avroSchema') + """.stripMargin + ) + assert(versionSpark.table(tableName).collect() === + versionSpark.sql("SELECT 1.30").collect()) + } + } + // TODO: add more tests. } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org