hudi-bot opened a new issue, #14769:
URL: https://github.com/apache/hudi/issues/14769

   sql("select * from employee_rt").show(false)
   
    
   
   {{174262 [Executor task launch worker for task 12017] ERROR 
org.apache.spark.executor.Executor  - Exception in task 0.0 in stage 31.0 (TID 
12017)
   java.lang.ClassCastException: org.apache.hadoop.io.LongWritable cannot be 
cast to org.apache.hadoop.hive.serde2.io.TimestampWritable
        at 
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableTimestampObjectInspector.getPrimitiveJavaObject(WritableTimestampObjectInspector.java:39)
        at 
org.apache.spark.sql.hive.HadoopTableReader$.$anonfun$fillObject$14(TableReader.scala:468)
        at 
org.apache.spark.sql.hive.HadoopTableReader$.$anonfun$fillObject$14$adapted(TableReader.scala:467)
        at 
org.apache.spark.sql.hive.HadoopTableReader$.$anonfun$fillObject$18(TableReader.scala:493)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
        at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
        at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:340)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:127)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)}}
   
   {{}}
   
   {{}}
   
   {{Steps to reproduce}}
   
   {{}}
   {code:java}
   import org.apache.spark.sql._
   import org.apache.hudi.QuickstartUtils._
   import scala.collection.JavaConversions._
   import org.apache.spark.sql.SaveMode._
   import org.apache.hudi.DataSourceReadOptions._
   import org.apache.hudi.DataSourceWriteOptions._
   import org.apache.hudi.config.HoodieWriteConfig._
   import org.apache.spark.sql.functions._
   import org.apache.hudi.QuickstartUtils._
   import scala.collection.JavaConversions._
   import org.apache.spark.sql.SaveMode._
   import org.apache.hudi.DataSourceReadOptions
   import org.apache.hudi.DataSourceWriteOptions
   import org.apache.hudi.config.HoodieWriteConfig
   import org.apache.hudi.hive.MultiPartKeysValueExtractor
   import org.apache.spark.sql.functions._
   import org.apache.hudi.keygen._
   import org.apache.spark.sql.streaming._
   
   case class Person(firstname:String, age:Int, gender:Int)
   val personDF = List(Person("tom",45,1), 
Person("iris",44,0)).toDF.withColumn("ts",unix_timestamp).withColumn("insert_time",current_timestamp)
   //val personDF2 = List(Person("peng",56,1), 
Person("iris",51,0),Person("jacky",25,1)).toDF.withColumn("ts",unix_timestamp).withColumn("insert_time",current_timestamp)
   
   
//personDF.write.mode(SaveMode.Overwrite).format("hudi").saveAsTable("employee")
   
   val tableName = "employee"
   val hudiCommonOptions = Map(
     "hoodie.compact.inline" -> "true",
     "hoodie.compact.inline.max.delta.commits" ->"5",
     "hoodie.base.path" -> s"/tmp/$tableName",
     "hoodie.table.name" -> tableName,
     "hoodie.datasource.write.table.type"->"MERGE_ON_READ",
     "hoodie.datasource.write.operation" -> "upsert",
     "hoodie.clean.async" -> "true"
   )
   
   val hudiHiveOptions = Map(
       DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true",
       DataSourceWriteOptions.HIVE_URL_OPT_KEY -> 
"jdbc:hive2://localhost:10000",
       DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "gender",
       DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY -> "true",
       "hoodie.datasource.hive_sync.support_timestamp"->"true",
       DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> tableName,
       DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> 
classOf[MultiPartKeysValueExtractor].getName
   )
   
   val basePath = s"/tmp/$tableName"
   personDF.write.format("hudi").
     option(PRECOMBINE_FIELD_OPT_KEY, "ts").
     option(RECORDKEY_FIELD_OPT_KEY, "firstname").
     option(PARTITIONPATH_FIELD_OPT_KEY, "gender").
     options(hudiCommonOptions).
     options(hudiHiveOptions).
     mode(SaveMode.Overwrite).
     save(basePath)
   
   sql("select * from employee_rt").show(false)
   {code}
   {{}}
   
   {{}}
   
   {{More info: https://github.com/apache/hudi/issues/2544}}
   
   {{}}
   
   {{}}
   
   ## JIRA info
   
   - Link: https://issues.apache.org/jira/browse/HUDI-1736
   - Type: Bug


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to