This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new cc508d1 [SPARK-34370][SQL] Support Avro schema evolution for partitioned Hive tables using "avro.schema.url" cc508d1 is described below commit cc508d17c7176bd3504c9f3af68794e1b98f8a81 Author: “attilapiros” <piros.attila.zs...@gmail.com> AuthorDate: Sat Feb 6 17:25:39 2021 -0800 [SPARK-34370][SQL] Support Avro schema evolution for partitioned Hive tables using "avro.schema.url" ### What changes were proposed in this pull request? With https://github.com/apache/spark/pull/31133 Avro schema evolution is introduce for partitioned hive tables where the schema is given by `avro.schema.literal`. Here that functionality is extended to support schema evolution where the schema is defined via `avro.schema.url`. ### Why are the changes needed? Without this PR the problem described in https://github.com/apache/spark/pull/31133 can be reproduced by tables where `avro.schema.url` is used. As in this case always the property value given at partition level is used for the `avro.schema.url`. So for example when a new column (with a default value) is added to the table then one the following problem happens: - when the new field is added after the last one the cell values will be null values instead of the default value - when the schema is extended somewhere before the last field then values will be listed for the wrong column positions Similar error will happen when one of the field is removed from the schema. For details please check the attached unit tests where both cases are checked. ### Does this PR introduce _any_ user-facing change? Fixes the potential value error. ### How was this patch tested? The existing unit tests for schema evolution is generalized and reused. New tests: - `SPARK-34370: support Avro schema evolution (add column with avro.schema.url)` - `SPARK-34370: support Avro schema evolution (remove column with avro.schema.url)` Closes #31501 from attilapiros/SPARK-34370. Authored-by: “attilapiros” <piros.attila.zs...@gmail.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../org/apache/spark/sql/hive/TableReader.scala | 4 +- .../src/test/resources/schemaWithOneField.avsc | 12 ++ .../src/test/resources/schemaWithTwoFields.avsc | 16 ++ .../spark/sql/hive/execution/HiveDDLSuite.scala | 174 ++++++++++++--------- 4 files changed, 132 insertions(+), 74 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index 27fd2cc..96949a3 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -240,6 +240,8 @@ class HadoopTableReader( fillPartitionKeys(partValues, mutableRow) val tableProperties = tableDesc.getProperties + val avroSchemaProperties = Seq(AvroTableProperties.SCHEMA_LITERAL, + AvroTableProperties.SCHEMA_URL).map(_.getPropName()) // Create local references so that the outer object isn't serialized. val localTableDesc = tableDesc @@ -256,7 +258,7 @@ class HadoopTableReader( // properties. val props = new Properties(tableProperties) partProps.asScala.filterNot { case (k, _) => - k == AvroTableProperties.SCHEMA_LITERAL.getPropName() && tableProperties.containsKey(k) + avroSchemaProperties.contains(k) && tableProperties.containsKey(k) }.foreach { case (key, value) => props.setProperty(key, value) } diff --git a/sql/hive/src/test/resources/schemaWithOneField.avsc b/sql/hive/src/test/resources/schemaWithOneField.avsc new file mode 100644 index 0000000..e6e2431 --- /dev/null +++ b/sql/hive/src/test/resources/schemaWithOneField.avsc @@ -0,0 +1,12 @@ +{ + "namespace": "test", + "name": "some_schema", + "type": "record", + "fields": [ + { + "name": "col2", + "type": "string" + } + ] +} + diff --git a/sql/hive/src/test/resources/schemaWithTwoFields.avsc b/sql/hive/src/test/resources/schemaWithTwoFields.avsc new file mode 100644 index 0000000..3d1d24c --- /dev/null +++ b/sql/hive/src/test/resources/schemaWithTwoFields.avsc @@ -0,0 +1,16 @@ +{ + "namespace": "test", + "name": "some_schema", + "type": "record", + "fields": [ + { + "name": "col1", + "type": "string", + "default": "col1_default" + }, + { + "name": "col2", + "type": "string" + } + ] +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 9f5b045..d8a3bf6 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -40,7 +40,7 @@ import org.apache.spark.sql.functions._ import org.apache.spark.sql.hive.HiveExternalCatalog import org.apache.spark.sql.hive.HiveUtils.{CONVERT_METASTORE_ORC, CONVERT_METASTORE_PARQUET} import org.apache.spark.sql.hive.orc.OrcFileOperator -import org.apache.spark.sql.hive.test.{TestHiveSingleton, TestHiveSparkSession} +import org.apache.spark.sql.hive.test.{TestHive, TestHiveSingleton, TestHiveSparkSession} import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} import org.apache.spark.sql.internal.SQLConf.ORC_IMPLEMENTATION import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION @@ -1853,104 +1853,132 @@ class HiveDDLSuite } } - test("SPARK-26836: support Avro schema evolution (add column)") { + test("SPARK-34370: support Avro schema evolution (add column with avro.schema.url)") { + checkAvroSchemaEvolutionAddColumn( + s"'avro.schema.url'='${TestHive.getHiveFile("schemaWithOneField.avsc").toURI}'", + s"'avro.schema.url'='${TestHive.getHiveFile("schemaWithTwoFields.avsc").toURI}'") + } + + test("SPARK-26836: support Avro schema evolution (add column with avro.schema.literal)") { + val originalSchema = + """ + |{ + | "namespace": "test", + | "name": "some_schema", + | "type": "record", + | "fields": [ + | { + | "name": "col2", + | "type": "string" + | } + | ] + |} + """.stripMargin + val evolvedSchema = + """ + |{ + | "namespace": "test", + | "name": "some_schema", + | "type": "record", + | "fields": [ + | { + | "name": "col1", + | "type": "string", + | "default": "col1_default" + | }, + | { + | "name": "col2", + | "type": "string" + | } + | ] + |} + """.stripMargin + checkAvroSchemaEvolutionAddColumn( + s"'avro.schema.literal'='$originalSchema'", + s"'avro.schema.literal'='$evolvedSchema'") + } + + private def checkAvroSchemaEvolutionAddColumn( + originalSerdeProperties: String, + evolvedSerdeProperties: String) = { withTable("t") { - val originalSchema = - """ - |{ - | "namespace": "test", - | "name": "some_schema", - | "type": "record", - | "fields": [ - | { - | "name": "col2", - | "type": "string" - | } - | ] - |} - """.stripMargin sql( s""" |CREATE TABLE t PARTITIONED BY (ds string) |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' - |WITH SERDEPROPERTIES ('avro.schema.literal'='$originalSchema') + |WITH SERDEPROPERTIES ($originalSerdeProperties) |STORED AS |INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' |OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' """.stripMargin) sql("INSERT INTO t partition (ds='1981-01-07') VALUES ('col2_value')") - val evolvedSchema = - """ - |{ - | "namespace": "test", - | "name": "some_schema", - | "type": "record", - | "fields": [ - | { - | "name": "col1", - | "type": "string", - | "default": "col1_default" - | }, - | { - | "name": "col2", - | "type": "string" - | } - | ] - |} - """.stripMargin - sql(s"ALTER TABLE t SET SERDEPROPERTIES ('avro.schema.literal'='$evolvedSchema')") + sql(s"ALTER TABLE t SET SERDEPROPERTIES ($evolvedSerdeProperties)") sql("INSERT INTO t partition (ds='1983-04-27') VALUES ('col1_value', 'col2_value')") checkAnswer(spark.table("t"), Row("col1_default", "col2_value", "1981-01-07") :: Row("col1_value", "col2_value", "1983-04-27") :: Nil) } } - test("SPARK-26836: support Avro schema evolution (remove column)") { + test("SPARK-34370: support Avro schema evolution (remove column with avro.schema.url)") { + checkAvroSchemaEvolutionRemoveColumn( + s"'avro.schema.url'='${TestHive.getHiveFile("schemaWithTwoFields.avsc").toURI}'", + s"'avro.schema.url'='${TestHive.getHiveFile("schemaWithOneField.avsc").toURI}'") + } + + test("SPARK-26836: support Avro schema evolution (remove column with avro.schema.literal)") { + val originalSchema = + """ + |{ + | "namespace": "test", + | "name": "some_schema", + | "type": "record", + | "fields": [ + | { + | "name": "col1", + | "type": "string", + | "default": "col1_default" + | }, + | { + | "name": "col2", + | "type": "string" + | } + | ] + |} + """.stripMargin + val evolvedSchema = + """ + |{ + | "namespace": "test", + | "name": "some_schema", + | "type": "record", + | "fields": [ + | { + | "name": "col2", + | "type": "string" + | } + | ] + |} + """.stripMargin + checkAvroSchemaEvolutionRemoveColumn( + s"'avro.schema.literal'='$originalSchema'", + s"'avro.schema.literal'='$evolvedSchema'") + } + + private def checkAvroSchemaEvolutionRemoveColumn( + originalSerdeProperties: String, + evolvedSerdeProperties: String) = { withTable("t") { - val originalSchema = - """ - |{ - | "namespace": "test", - | "name": "some_schema", - | "type": "record", - | "fields": [ - | { - | "name": "col1", - | "type": "string", - | "default": "col1_default" - | }, - | { - | "name": "col2", - | "type": "string" - | } - | ] - |} - """.stripMargin sql( s""" |CREATE TABLE t PARTITIONED BY (ds string) |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' - |WITH SERDEPROPERTIES ('avro.schema.literal'='$originalSchema') + |WITH SERDEPROPERTIES ($originalSerdeProperties) |STORED AS |INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' |OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' """.stripMargin) sql("INSERT INTO t partition (ds='1983-04-27') VALUES ('col1_value', 'col2_value')") - val evolvedSchema = - """ - |{ - | "namespace": "test", - | "name": "some_schema", - | "type": "record", - | "fields": [ - | { - | "name": "col2", - | "type": "string" - | } - | ] - |} - """.stripMargin - sql(s"ALTER TABLE t SET SERDEPROPERTIES ('avro.schema.literal'='$evolvedSchema')") + sql(s"ALTER TABLE t SET SERDEPROPERTIES ($evolvedSerdeProperties)") sql("INSERT INTO t partition (ds='1981-01-07') VALUES ('col2_value')") checkAnswer(spark.table("t"), Row("col2_value", "1981-01-07") :: Row("col2_value", "1983-04-27") :: Nil) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org