Repository: spark Updated Branches: refs/heads/branch-2.2 df9228b49 -> b17f4063c
[SPARK-17920][SPARK-19580][SPARK-19878][SQL] Backport PR 19779 to branch-2.2 - Support writing to Hive table which uses Avro schema url 'avro.schema.url' ## What changes were proposed in this pull request? > Backport https://github.com/apache/spark/pull/19779 to branch-2.2 SPARK-19580 Support for avro.schema.url while writing to hive table SPARK-19878 Add hive configuration when initialize hive serde in InsertIntoHiveTable.scala SPARK-17920 HiveWriterContainer passes null configuration to serde.initialize, causing NullPointerException in AvroSerde when using avro.schema.url Support writing to Hive table which uses Avro schema url 'avro.schema.url' For ex: create external table avro_in (a string) stored as avro location '/avro-in/' tblproperties ('avro.schema.url'='/avro-schema/avro.avsc'); create external table avro_out (a string) stored as avro location '/avro-out/' tblproperties ('avro.schema.url'='/avro-schema/avro.avsc'); insert overwrite table avro_out select * from avro_in; // fails with java.lang.NullPointerException WARN AvroSerDe: Encountered exception determining schema. Returning signal schema to indicate problem java.lang.NullPointerException at org.apache.hadoop.fs.FileSystem.getDefaultUri(FileSystem.java:182) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:174) ## Changes proposed in this fix Currently 'null' value is passed to serializer, which causes NPE during insert operation, instead pass Hadoop configuration object ## How was this patch tested? Added new test case in VersionsSuite Author: vinodkc <vinod.kc...@gmail.com> Closes #19795 from vinodkc/br_Fix_SPARK-17920_branch-2.2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b17f4063 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b17f4063 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b17f4063 Branch: refs/heads/branch-2.2 Commit: b17f4063cdf52c101ae562ac2a885918acd172ac Parents: df9228b Author: vinodkc <vinod.kc...@gmail.com> Authored: Wed Nov 22 09:21:26 2017 -0800 Committer: gatorsmile <gatorsm...@gmail.com> Committed: Wed Nov 22 09:21:26 2017 -0800 ---------------------------------------------------------------------- .../sql/hive/execution/HiveFileFormat.scala | 4 +- .../spark/sql/hive/client/VersionsSuite.scala | 53 +++++++++++++++++++- 2 files changed, 54 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/b17f4063/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala index ac735e8..4a7cd69 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala @@ -116,7 +116,7 @@ class HiveOutputWriter( private val serializer = { val serializer = tableDesc.getDeserializerClass.newInstance().asInstanceOf[Serializer] - serializer.initialize(null, tableDesc.getProperties) + serializer.initialize(jobConf, tableDesc.getProperties) serializer } @@ -130,7 +130,7 @@ class HiveOutputWriter( private val standardOI = ObjectInspectorUtils .getStandardObjectInspector( - tableDesc.getDeserializer.getObjectInspector, + tableDesc.getDeserializer(jobConf).getObjectInspector, ObjectInspectorCopyOption.JAVA) .asInstanceOf[StructObjectInspector] http://git-wip-us.apache.org/repos/asf/spark/blob/b17f4063/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index 7aff49c..d48a23f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.hive.client -import java.io.{ByteArrayOutputStream, File, PrintStream} +import java.io.{ByteArrayOutputStream, File, PrintStream, PrintWriter} import java.net.URI import org.apache.hadoop.conf.Configuration @@ -697,6 +697,57 @@ class VersionsSuite extends SparkFunSuite with Logging { assert(versionSpark.table("t1").collect() === Array(Row(2))) } } + + test(s"$version: SPARK-17920: Insert into/overwrite avro table") { + withTempDir { dir => + val path = dir.getAbsolutePath + val schemaPath = s"""$path${File.separator}avroschemadir""" + val destTableName = "tab1" + + new File(schemaPath).mkdir() + + val avroSchema = + """{ + |"type": "record", + | "name": "test_Record", + | "namespace": "ns.avro", + | "fields" : [ + | {"name": "f1", "type": "string"}, + | {"name": "f2", "type": ["null", "string"]} + | ] + |} + """.stripMargin + + withTable(destTableName) { + val schemaUrl = s"""$schemaPath${File.separator}avroSchema.avsc""" + val schemaFile = new File(schemaPath, "avroSchema.avsc") + val writer = new PrintWriter(schemaFile) + writer.write(avroSchema) + writer.close() + + versionSpark.sql( + s"""CREATE TABLE $destTableName + |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' + |STORED AS + | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' + | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' + |TBLPROPERTIES ('avro.schema.url' = '$schemaUrl') + """.stripMargin + ) + val insertStmt = s"INSERT OVERWRITE TABLE $destTableName SELECT 'ABC', 'DEF'" + if (version == "0.12") { + // Hive 0.12 throws AnalysisException + intercept[AnalysisException](versionSpark.sql(insertStmt)) + } else { + val result = versionSpark.sql("SELECT 'ABC', 'DEF'").collect() + versionSpark.sql(insertStmt) + assert(versionSpark.table(destTableName).collect() === result) + versionSpark.sql(s"INSERT INTO TABLE $destTableName SELECT 'ABC', 'DEF'") + assert(versionSpark.table(destTableName).collect() === result ++ result) + } + } + } + } // TODO: add more tests. } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org