This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new cc508d1  [SPARK-34370][SQL] Support Avro schema evolution for 
partitioned Hive tables using "avro.schema.url"
cc508d1 is described below

commit cc508d17c7176bd3504c9f3af68794e1b98f8a81
Author: “attilapiros” <piros.attila.zs...@gmail.com>
AuthorDate: Sat Feb 6 17:25:39 2021 -0800

    [SPARK-34370][SQL] Support Avro schema evolution for partitioned Hive 
tables using "avro.schema.url"
    
    ### What changes were proposed in this pull request?
    
    With https://github.com/apache/spark/pull/31133 Avro schema evolution is 
introduce for partitioned hive tables where the schema is given by 
`avro.schema.literal`.
    Here that functionality is extended to support schema evolution where the 
schema is defined via `avro.schema.url`.
    
    ### Why are the changes needed?
    
    Without this PR the problem described in 
https://github.com/apache/spark/pull/31133 can be reproduced by tables where 
`avro.schema.url` is used. As in this case always the property value given at 
partition level is used for the `avro.schema.url`.
    
    So for example when a new column (with a default value) is added to the 
table then one the following problem happens:
    -  when the new field is added after the last one the cell values will be 
null values instead of the default value
    -  when the schema is extended somewhere before the last field then values 
will be listed for the wrong column positions
    
    Similar error will happen when one of the field is removed from the schema.
    
    For details please check the attached unit tests where both cases are 
checked.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Fixes the potential value error.
    
    ### How was this patch tested?
    
    The existing unit tests for schema evolution is generalized and reused.
    New tests:
    - `SPARK-34370: support Avro schema evolution (add column with 
avro.schema.url)`
    - `SPARK-34370: support Avro schema evolution (remove column with 
avro.schema.url)`
    
    Closes #31501 from attilapiros/SPARK-34370.
    
    Authored-by: “attilapiros” <piros.attila.zs...@gmail.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../org/apache/spark/sql/hive/TableReader.scala    |   4 +-
 .../src/test/resources/schemaWithOneField.avsc     |  12 ++
 .../src/test/resources/schemaWithTwoFields.avsc    |  16 ++
 .../spark/sql/hive/execution/HiveDDLSuite.scala    | 174 ++++++++++++---------
 4 files changed, 132 insertions(+), 74 deletions(-)

diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
index 27fd2cc..96949a3 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
@@ -240,6 +240,8 @@ class HadoopTableReader(
       fillPartitionKeys(partValues, mutableRow)
 
       val tableProperties = tableDesc.getProperties
+      val avroSchemaProperties = Seq(AvroTableProperties.SCHEMA_LITERAL,
+        AvroTableProperties.SCHEMA_URL).map(_.getPropName())
 
       // Create local references so that the outer object isn't serialized.
       val localTableDesc = tableDesc
@@ -256,7 +258,7 @@ class HadoopTableReader(
         // properties.
         val props = new Properties(tableProperties)
         partProps.asScala.filterNot { case (k, _) =>
-          k == AvroTableProperties.SCHEMA_LITERAL.getPropName() && 
tableProperties.containsKey(k)
+          avroSchemaProperties.contains(k) && tableProperties.containsKey(k)
         }.foreach {
           case (key, value) => props.setProperty(key, value)
         }
diff --git a/sql/hive/src/test/resources/schemaWithOneField.avsc 
b/sql/hive/src/test/resources/schemaWithOneField.avsc
new file mode 100644
index 0000000..e6e2431
--- /dev/null
+++ b/sql/hive/src/test/resources/schemaWithOneField.avsc
@@ -0,0 +1,12 @@
+{
+  "namespace": "test",
+  "name": "some_schema",
+  "type": "record",
+  "fields": [
+    {
+      "name": "col2",
+      "type": "string"
+    }
+  ]
+}
+
diff --git a/sql/hive/src/test/resources/schemaWithTwoFields.avsc 
b/sql/hive/src/test/resources/schemaWithTwoFields.avsc
new file mode 100644
index 0000000..3d1d24c
--- /dev/null
+++ b/sql/hive/src/test/resources/schemaWithTwoFields.avsc
@@ -0,0 +1,16 @@
+{
+  "namespace": "test",
+  "name": "some_schema",
+  "type": "record",
+  "fields": [
+    {
+      "name": "col1",
+      "type": "string",
+      "default": "col1_default"
+    },
+    {
+      "name": "col2",
+      "type": "string"
+    }
+  ]
+}
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 9f5b045..d8a3bf6 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -40,7 +40,7 @@ import org.apache.spark.sql.functions._
 import org.apache.spark.sql.hive.HiveExternalCatalog
 import org.apache.spark.sql.hive.HiveUtils.{CONVERT_METASTORE_ORC, 
CONVERT_METASTORE_PARQUET}
 import org.apache.spark.sql.hive.orc.OrcFileOperator
-import org.apache.spark.sql.hive.test.{TestHiveSingleton, TestHiveSparkSession}
+import org.apache.spark.sql.hive.test.{TestHive, TestHiveSingleton, 
TestHiveSparkSession}
 import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
 import org.apache.spark.sql.internal.SQLConf.ORC_IMPLEMENTATION
 import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
@@ -1853,104 +1853,132 @@ class HiveDDLSuite
     }
   }
 
-  test("SPARK-26836: support Avro schema evolution (add column)") {
+  test("SPARK-34370: support Avro schema evolution (add column with 
avro.schema.url)") {
+    checkAvroSchemaEvolutionAddColumn(
+      
s"'avro.schema.url'='${TestHive.getHiveFile("schemaWithOneField.avsc").toURI}'",
+      
s"'avro.schema.url'='${TestHive.getHiveFile("schemaWithTwoFields.avsc").toURI}'")
+  }
+
+  test("SPARK-26836: support Avro schema evolution (add column with 
avro.schema.literal)") {
+    val originalSchema =
+      """
+        |{
+        |  "namespace": "test",
+        |  "name": "some_schema",
+        |  "type": "record",
+        |  "fields": [
+        |    {
+        |      "name": "col2",
+        |      "type": "string"
+        |    }
+        |  ]
+        |}
+      """.stripMargin
+    val evolvedSchema =
+      """
+        |{
+        |  "namespace": "test",
+        |  "name": "some_schema",
+        |  "type": "record",
+        |  "fields": [
+        |    {
+        |      "name": "col1",
+        |      "type": "string",
+        |      "default": "col1_default"
+        |    },
+        |    {
+        |      "name": "col2",
+        |      "type": "string"
+        |    }
+        |  ]
+        |}
+      """.stripMargin
+    checkAvroSchemaEvolutionAddColumn(
+      s"'avro.schema.literal'='$originalSchema'",
+      s"'avro.schema.literal'='$evolvedSchema'")
+  }
+
+  private def checkAvroSchemaEvolutionAddColumn(
+    originalSerdeProperties: String,
+    evolvedSerdeProperties: String) = {
     withTable("t") {
-      val originalSchema =
-        """
-          |{
-          |  "namespace": "test",
-          |  "name": "some_schema",
-          |  "type": "record",
-          |  "fields": [
-          |    {
-          |      "name": "col2",
-          |      "type": "string"
-          |    }
-          |  ]
-          |}
-        """.stripMargin
       sql(
         s"""
           |CREATE TABLE t PARTITIONED BY (ds string)
           |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
-          |WITH SERDEPROPERTIES ('avro.schema.literal'='$originalSchema')
+          |WITH SERDEPROPERTIES ($originalSerdeProperties)
           |STORED AS
           |INPUTFORMAT 
'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
           |OUTPUTFORMAT 
'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
         """.stripMargin)
       sql("INSERT INTO t partition (ds='1981-01-07') VALUES ('col2_value')")
-      val evolvedSchema =
-        """
-          |{
-          |  "namespace": "test",
-          |  "name": "some_schema",
-          |  "type": "record",
-          |  "fields": [
-          |    {
-          |      "name": "col1",
-          |      "type": "string",
-          |      "default": "col1_default"
-          |    },
-          |    {
-          |      "name": "col2",
-          |      "type": "string"
-          |    }
-          |  ]
-          |}
-        """.stripMargin
-      sql(s"ALTER TABLE t SET SERDEPROPERTIES 
('avro.schema.literal'='$evolvedSchema')")
+      sql(s"ALTER TABLE t SET SERDEPROPERTIES ($evolvedSerdeProperties)")
       sql("INSERT INTO t partition (ds='1983-04-27') VALUES ('col1_value', 
'col2_value')")
       checkAnswer(spark.table("t"), Row("col1_default", "col2_value", 
"1981-01-07")
         :: Row("col1_value", "col2_value", "1983-04-27") :: Nil)
     }
   }
 
-  test("SPARK-26836: support Avro schema evolution (remove column)") {
+  test("SPARK-34370: support Avro schema evolution (remove column with 
avro.schema.url)") {
+    checkAvroSchemaEvolutionRemoveColumn(
+      
s"'avro.schema.url'='${TestHive.getHiveFile("schemaWithTwoFields.avsc").toURI}'",
+      
s"'avro.schema.url'='${TestHive.getHiveFile("schemaWithOneField.avsc").toURI}'")
+  }
+
+  test("SPARK-26836: support Avro schema evolution (remove column with 
avro.schema.literal)") {
+    val originalSchema =
+      """
+        |{
+        |  "namespace": "test",
+        |  "name": "some_schema",
+        |  "type": "record",
+        |  "fields": [
+        |    {
+        |      "name": "col1",
+        |      "type": "string",
+        |      "default": "col1_default"
+        |    },
+        |    {
+        |      "name": "col2",
+        |      "type": "string"
+        |    }
+        |  ]
+        |}
+      """.stripMargin
+    val evolvedSchema =
+      """
+        |{
+        |  "namespace": "test",
+        |  "name": "some_schema",
+        |  "type": "record",
+        |  "fields": [
+        |    {
+        |      "name": "col2",
+        |      "type": "string"
+        |    }
+        |  ]
+        |}
+      """.stripMargin
+    checkAvroSchemaEvolutionRemoveColumn(
+      s"'avro.schema.literal'='$originalSchema'",
+      s"'avro.schema.literal'='$evolvedSchema'")
+  }
+
+  private def checkAvroSchemaEvolutionRemoveColumn(
+    originalSerdeProperties: String,
+    evolvedSerdeProperties: String) = {
     withTable("t") {
-      val originalSchema =
-        """
-          |{
-          |  "namespace": "test",
-          |  "name": "some_schema",
-          |  "type": "record",
-          |  "fields": [
-          |    {
-          |      "name": "col1",
-          |      "type": "string",
-          |      "default": "col1_default"
-          |    },
-          |    {
-          |      "name": "col2",
-          |      "type": "string"
-          |    }
-          |  ]
-          |}
-        """.stripMargin
       sql(
         s"""
           |CREATE TABLE t PARTITIONED BY (ds string)
           |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
-          |WITH SERDEPROPERTIES ('avro.schema.literal'='$originalSchema')
+          |WITH SERDEPROPERTIES ($originalSerdeProperties)
           |STORED AS
           |INPUTFORMAT 
'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
           |OUTPUTFORMAT 
'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
         """.stripMargin)
       sql("INSERT INTO t partition (ds='1983-04-27') VALUES ('col1_value', 
'col2_value')")
-      val evolvedSchema =
-        """
-          |{
-          |  "namespace": "test",
-          |  "name": "some_schema",
-          |  "type": "record",
-          |  "fields": [
-          |    {
-          |      "name": "col2",
-          |      "type": "string"
-          |    }
-          |  ]
-          |}
-        """.stripMargin
-      sql(s"ALTER TABLE t SET SERDEPROPERTIES 
('avro.schema.literal'='$evolvedSchema')")
+      sql(s"ALTER TABLE t SET SERDEPROPERTIES ($evolvedSerdeProperties)")
       sql("INSERT INTO t partition (ds='1981-01-07') VALUES ('col2_value')")
       checkAnswer(spark.table("t"), Row("col2_value", "1981-01-07")
         :: Row("col2_value", "1983-04-27") :: Nil)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to