[ 
https://issues.apache.org/jira/browse/SPARK-20973?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yunjian Zhang updated SPARK-20973:
----------------------------------
    Comment: was deleted

(was: I did check the source code and add a patch to fix the insert issue as 
below, unable to attach file here, so just past the content as well.
----------------------------------------------
--- 
a/./workspace1/spark-2.1.0/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
+++ 
b/./workspace/git/gdr/spark/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
@@ -57,7 +57,7 @@ private[hive] class SparkHiveWriterContainer(
   extends Logging
   with HiveInspectors
   with Serializable {
-
+
   private val now = new Date()
   private val tableDesc: TableDesc = fileSinkConf.getTableInfo
   // Add table properties from storage handler to jobConf, so any custom 
storage
@@ -154,6 +154,12 @@ private[hive] class SparkHiveWriterContainer(
     conf.value.setBoolean("mapred.task.is.map", true)
     conf.value.setInt("mapred.task.partition", splitID)
   }
+
+  def newSerializer(tableDesc: TableDesc): Serializer = {
+    val serializer = 
tableDesc.getDeserializerClass.newInstance().asInstanceOf[Serializer]
+    serializer.initialize(null, tableDesc.getProperties)
+    serializer
+  }
 
   def newSerializer(jobConf: JobConf, tableDesc: TableDesc): Serializer = {
     val serializer = 
tableDesc.getDeserializerClass.newInstance().asInstanceOf[Serializer]
@@ -162,10 +168,11 @@ private[hive] class SparkHiveWriterContainer(
   }
 
   protected def prepareForWrite() = {
-    val serializer = newSerializer(jobConf, fileSinkConf.getTableInfo)
+    val serializer = newSerializer(conf.value, fileSinkConf.getTableInfo)
+    logInfo("CHECK table deser:" + 
fileSinkConf.getTableInfo.getDeserializer(conf.value))
     val standardOI = ObjectInspectorUtils
       .getStandardObjectInspector(
-        fileSinkConf.getTableInfo.getDeserializer.getObjectInspector,
+        
fileSinkConf.getTableInfo.getDeserializer(conf.value).getObjectInspector,
         ObjectInspectorCopyOption.JAVA)
       .asInstanceOf[StructObjectInspector])

> insert table fail caused by unable to fetch data definition file from remote 
> hdfs 
> ----------------------------------------------------------------------------------
>
>                 Key: SPARK-20973
>                 URL: https://issues.apache.org/jira/browse/SPARK-20973
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.1.0
>            Reporter: Yunjian Zhang
>              Labels: patch
>         Attachments: spark-sql-insert.patch
>
>
> I implemented my own hive serde to handle special data files which needs to 
> read data definition during process.
> the process include
> 1.read definition file location from TBLPROPERTIES
> 2.read file content as per step 1
> 3.init serde base on step 2.
> //DDL of the table as below:
> ---------------------------------------------
> CREATE EXTERNAL TABLE dw_user_stg_txt_out
> ROW FORMAT SERDE 'com.ebay.dss.gdr.hive.serde.abvro.AbvroSerDe'
> STORED AS
>   INPUTFORMAT 'com.ebay.dss.gdr.mapred.AbAsAvroInputFormat'
>   OUTPUTFORMAT 'com.ebay.dss.gdr.hive.ql.io.ab.AvroAsAbOutputFormat'
> LOCATION 'hdfs://${remote_hdfs}/user/data'
> TBLPROPERTIES (
>   'com.ebay.dss.dml.file' = 'hdfs://${remote_hdfs}/dml/user.dml'
> )
> // insert statement
> insert overwrite table dw_user_stg_txt_out select * from dw_user_stg_txt_avro;
> //fail with ERROR
> 17/06/02 15:46:34 ERROR SparkSQLDriver: Failed in [insert overwrite table 
> dw_user_stg_txt_out select * from dw_user_stg_txt_avro]
> java.lang.RuntimeException: FAILED to get dml file from: 
> hdfs://${remote-hdfs}/dml/user.dml
>       at 
> com.ebay.dss.gdr.hive.serde.abvro.AbvroSerDe.initialize(AbvroSerDe.java:109)
>       at 
> org.apache.spark.sql.hive.SparkHiveWriterContainer.newSerializer(hiveWriterContainers.scala:160)
>       at 
> org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult$lzycompute(InsertIntoHiveTable.scala:258)
>       at 
> org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult(InsertIntoHiveTable.scala:170)
>       at 
> org.apache.spark.sql.hive.execution.InsertIntoHiveTable.doExecute(InsertIntoHiveTable.scala:347)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to