[ https://issues.apache.org/jira/browse/HUDI-2495?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
sivabalan narayanan updated HUDI-2495: -------------------------------------- Fix Version/s: 0.10.0 > Difference in behavior between GenericRecord based key gen and Row based key > gen > --------------------------------------------------------------------------------- > > Key: HUDI-2495 > URL: https://issues.apache.org/jira/browse/HUDI-2495 > Project: Apache Hudi > Issue Type: Sub-task > Components: Spark Integration > Reporter: sivabalan narayanan > Assignee: sivabalan narayanan > Priority: Blocker > Labels: sev:critical, user-support-issues > Fix For: 0.10.0 > > > when complex key gen is used and one of the field in record key is a > timestamp field, row writer path and rdd path gives different record key > values. GenericRecord path converts timestamp, where as row writer path does > not do any conversion. > > import java.sql.Timestamp > import spark.implicits._ > val df = Seq( > (1, Timestamp.valueOf("2014-01-01 23:00:01"), "abc"), > (1, Timestamp.valueOf("2014-11-30 12:40:32"), "abc"), > (2, Timestamp.valueOf("2016-12-29 09:54:00"), "def"), > (2, Timestamp.valueOf("2016-05-09 10:12:43"), "def") > ).toDF("typeId","eventTime", "str") > > df.write.format("hudi"). > option("hoodie.insert.shuffle.parallelism", "2"). > option("hoodie.upsert.shuffle.parallelism", "2"). > option("hoodie.bulkinsert.shuffle.parallelism", "2"). > option("hoodie.datasource.write.precombine.field", "typeId"). > option("hoodie.datasource.write.partitionpath.field", "typeId"). > option("hoodie.datasource.write.recordkey.field", "str,eventTime"). > > option("hoodie.datasource.write.keygenerator.class","org.apache.hudi.keygen.ComplexKeyGenerator"). > option("hoodie.table.name", "hudi_tbl"). > mode(Overwrite). > save("/tmp/hudi_tbl_trial/") > > val hudiDF = spark.read.format("hudi").load("/tmp/hudi_tbl_trial/") > hudiDF.createOrReplaceTempView("hudi_sql_tbl") > spark.sql("select _hoodie_record_key, str, eventTime, typeId from > hudi_sql_tbl").show(false) > > {code:java} > +----------------------------------+---+-------------------+------+ > |_hoodie_record_key |str|eventTime |typeId| > +----------------------------------+---+-------------------+------+ > |str:abc,eventTime:1417369232000000|abc|2014-11-30 12:40:32|1 | > |str:abc,eventTime:1388635201000000|abc|2014-01-01 23:00:01|1 | > |str:def,eventTime:1462803163000000|def|2016-05-09 10:12:43|2 | > |str:def,eventTime:1483023240000000|def|2016-12-29 09:54:00|2 | > +----------------------------------+---+-------------------+------+ > {code} > > > // now retry w/ bulk_insert row writer path > df.write.format("hudi"). > option("hoodie.insert.shuffle.parallelism", "2"). > option("hoodie.upsert.shuffle.parallelism", "2"). > option("hoodie.bulkinsert.shuffle.parallelism", "2"). > option("hoodie.datasource.write.precombine.field", "typeId"). > option("hoodie.datasource.write.partitionpath.field", "typeId"). > option("hoodie.datasource.write.recordkey.field", "str,eventTime"). > > option("hoodie.datasource.write.keygenerator.class","org.apache.hudi.keygen.ComplexKeyGenerator"). > option("hoodie.table.name", "hudi_tbl"). > "hoodie.datasource.write.operation","bulk_insert"). > mode(Overwrite). > save("/tmp/hudi_tbl_trial_bulk_insert/") > > val hudiDF_bulk_insert = > spark.read.format("hudi").load("/tmp/hudi_tbl_trial_bulk_insert/") > hudiDF_bulk_insert.createOrReplaceTempView("hudi_sql_tbl_bulk_insert") > spark.sql("select _hoodie_record_key, str, eventTime, typeId from > hudi_sql_tbl_bulk_insert").show(false) > {code:java} > +---------------------------------------+---+-------------------+------+ > |_hoodie_record_key |str|eventTime |typeId| > +---------------------------------------+---+-------------------+------+ > |str:def,eventTime:2016-05-09 10:12:43.0|def|2016-05-09 10:12:43|2 | > |str:def,eventTime:2016-12-29 09:54:00.0|def|2016-12-29 09:54:00|2 | > |str:abc,eventTime:2014-01-01 23:00:01.0|abc|2014-01-01 23:00:01|1 | > |str:abc,eventTime:2014-11-30 12:40:32.0|abc|2014-11-30 12:40:32|1 | > +---------------------------------------+---+-------------------+------+ > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)