Repository: spark
Updated Branches:
  refs/heads/branch-2.2 df9228b49 -> b17f4063c


[SPARK-17920][SPARK-19580][SPARK-19878][SQL] Backport PR 19779 to branch-2.2 - 
Support writing to Hive table which uses Avro schema url 'avro.schema.url'

## What changes were proposed in this pull request?

> Backport https://github.com/apache/spark/pull/19779 to branch-2.2

SPARK-19580 Support for avro.schema.url while writing to hive table
SPARK-19878 Add hive configuration when initialize hive serde in 
InsertIntoHiveTable.scala
SPARK-17920 HiveWriterContainer passes null configuration to serde.initialize, 
causing NullPointerException in AvroSerde when using avro.schema.url

Support writing to Hive table which uses Avro schema url 'avro.schema.url'
For ex:
create external table avro_in (a string) stored as avro location '/avro-in/' 
tblproperties ('avro.schema.url'='/avro-schema/avro.avsc');

create external table avro_out (a string) stored as avro location '/avro-out/' 
tblproperties ('avro.schema.url'='/avro-schema/avro.avsc');

insert overwrite table avro_out select * from avro_in; // fails with 
java.lang.NullPointerException

WARN AvroSerDe: Encountered exception determining schema. Returning signal 
schema to indicate problem
java.lang.NullPointerException
at org.apache.hadoop.fs.FileSystem.getDefaultUri(FileSystem.java:182)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:174)
## Changes proposed in this fix
Currently 'null' value is passed to serializer, which causes NPE during insert 
operation, instead pass Hadoop configuration object
## How was this patch tested?
Added new test case in VersionsSuite

Author: vinodkc <vinod.kc...@gmail.com>

Closes #19795 from vinodkc/br_Fix_SPARK-17920_branch-2.2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b17f4063
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b17f4063
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b17f4063

Branch: refs/heads/branch-2.2
Commit: b17f4063cdf52c101ae562ac2a885918acd172ac
Parents: df9228b
Author: vinodkc <vinod.kc...@gmail.com>
Authored: Wed Nov 22 09:21:26 2017 -0800
Committer: gatorsmile <gatorsm...@gmail.com>
Committed: Wed Nov 22 09:21:26 2017 -0800

----------------------------------------------------------------------
 .../sql/hive/execution/HiveFileFormat.scala     |  4 +-
 .../spark/sql/hive/client/VersionsSuite.scala   | 53 +++++++++++++++++++-
 2 files changed, 54 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b17f4063/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
index ac735e8..4a7cd69 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
@@ -116,7 +116,7 @@ class HiveOutputWriter(
 
   private val serializer = {
     val serializer = 
tableDesc.getDeserializerClass.newInstance().asInstanceOf[Serializer]
-    serializer.initialize(null, tableDesc.getProperties)
+    serializer.initialize(jobConf, tableDesc.getProperties)
     serializer
   }
 
@@ -130,7 +130,7 @@ class HiveOutputWriter(
 
   private val standardOI = ObjectInspectorUtils
     .getStandardObjectInspector(
-      tableDesc.getDeserializer.getObjectInspector,
+      tableDesc.getDeserializer(jobConf).getObjectInspector,
       ObjectInspectorCopyOption.JAVA)
     .asInstanceOf[StructObjectInspector]
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b17f4063/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index 7aff49c..d48a23f 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.hive.client
 
-import java.io.{ByteArrayOutputStream, File, PrintStream}
+import java.io.{ByteArrayOutputStream, File, PrintStream, PrintWriter}
 import java.net.URI
 
 import org.apache.hadoop.conf.Configuration
@@ -697,6 +697,57 @@ class VersionsSuite extends SparkFunSuite with Logging {
         assert(versionSpark.table("t1").collect() === Array(Row(2)))
       }
     }
+
+    test(s"$version: SPARK-17920: Insert into/overwrite avro table") {
+      withTempDir { dir =>
+        val path = dir.getAbsolutePath
+        val schemaPath = s"""$path${File.separator}avroschemadir"""
+        val destTableName = "tab1"
+
+        new File(schemaPath).mkdir()
+
+        val avroSchema =
+          """{
+            |"type": "record",
+            | "name": "test_Record",
+            | "namespace": "ns.avro",
+            | "fields" : [
+            |    {"name": "f1", "type": "string"},
+            |    {"name": "f2", "type": ["null", "string"]}
+            |   ]
+            |}
+          """.stripMargin
+
+        withTable(destTableName) {
+          val schemaUrl = s"""$schemaPath${File.separator}avroSchema.avsc"""
+          val schemaFile = new File(schemaPath, "avroSchema.avsc")
+          val writer = new PrintWriter(schemaFile)
+          writer.write(avroSchema)
+          writer.close()
+
+          versionSpark.sql(
+            s"""CREATE TABLE $destTableName
+               |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
+               |STORED AS
+               |  INPUTFORMAT 
'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
+               |  OUTPUTFORMAT 
'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
+               |TBLPROPERTIES ('avro.schema.url' = '$schemaUrl')
+           """.stripMargin
+          )
+          val insertStmt = s"INSERT OVERWRITE TABLE $destTableName SELECT 
'ABC', 'DEF'"
+          if (version == "0.12") {
+            // Hive 0.12 throws AnalysisException
+            intercept[AnalysisException](versionSpark.sql(insertStmt))
+            } else {
+            val result = versionSpark.sql("SELECT 'ABC', 'DEF'").collect()
+            versionSpark.sql(insertStmt)
+            assert(versionSpark.table(destTableName).collect() === result)
+            versionSpark.sql(s"INSERT INTO TABLE $destTableName SELECT 'ABC', 
'DEF'")
+            assert(versionSpark.table(destTableName).collect() === result ++ 
result)
+            }
+          }
+        }
+      }
     // TODO: add more tests.
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to