jai20242 opened a new issue, #11199:
URL: https://github.com/apache/hudi/issues/11199
Hello.
I am developing a project using Hudi version 0.14.1 and I have developed my
custom payload class. I have a problem using Merge On Read because the data is
not consistent. If I use Copy On Write, it works well but with Merge On Read
hudi calls the method getInsertValue before calling combineAndGetUpdateValue
when the row exists. It is a problem for me because in the method
getInsertValue I replace the values @#=BDP_N=#@ by null (because when the row
does not exist, this value is null). Then, in the method
combineAndGetUpdateValue I use the row before and I replace the columns
modified. If the value of a column is null, I get the value before.
I am going to explain better with an example:
**1) Insert a first row**
spark.createDataFrame(Seq(("1",2L,"value1","value2","2",false))).toDF("key",
"sort","field1","field2","partition","_hoodie_is_deleted").
write.format("hudi").
option(OPERATION_OPT_KEY, "upsert").
option(CDC_ENABLED.key(), "true").
option(TABLE_NAME, "pruebasc").
option("hoodie.datasource.write.payload.class","CustomOverwriteWithLatestAvroPayload").
option("hoodie.avro.schema.validate","false").
option("hoodie.datasource.write.recordkey.field","key").
option("hoodie.datasource.write.precombine.field","sort").
option("hoodie.datasource.write.new.columns.nullable", "true").
option("hoodie.datasource.write.reconcile.schema","true").
option("hoodie.metadata.enable","false").
option("hoodie.index.type","SIMPLE").
option("hoodie.datasource.write.table.type","MERGE_ON_READ").
option("hoodie.compact.inline","true").
mode(Overwrite).
save("/tmp/pruebasc")
I have the following row:
key -> 1
sort -> 2
field1 -> value1
field2 -> value2
partition -> 2
_hoodie_is_deleted -> false
**2) I update the row**. The custom payload replaces the values @#=BDP_N=#@
by null and the columns not modified keep the value before
spark.createDataFrame(Seq(("1",3L,"@#=BDP_N=#@","value3","2",false))).toDF("key",
"sort","field1","field2","partition","_hoodie_is_deleted").withColumn("field1",
col("field1").cast("String")).
write.format("hudi").
option(OPERATION_OPT_KEY, "upsert").
option(CDC_ENABLED.key(), "true").
option(TABLE_NAME, "pruebasc").
option("hoodie.datasource.write.payload.class","CustomOverwriteWithLatestAvroPayload").
option("hoodie.avro.schema.validate","false").
option("hoodie.datasource.write.recordkey.field","key").
option("hoodie.datasource.write.precombine.field","sort").
option("hoodie.datasource.write.new.columns.nullable", "true"). //Las
columnas nuevas insertadas las pone a null
option("hoodie.datasource.write.reconcile.schema","true"). //coge el
esquema de lo insertado y no del nuevo registro. Importante que todo sea nulo
menos la PK
option("hoodie.metadata.enable","false").
option("hoodie.index.type","SIMPLE").
option("hoodie.datasource.write.table.type","MERGE_ON_READ").
option("hoodie.compact.inline","true").
mode(Append).
save("/tmp/pruebasc")
I have the following row:
key -> 1
sort -> 3
**field1 -> value1**
field2 -> value3
partition -> 2
_hoodie_is_deleted -> false
But the correct row must be:
key -> 1
sort -> 3
**field1 -> null**
field2 -> value3
partition -> 2
_hoodie_is_deleted -> false
I have checked my log and the problem is that using Merge On Read, hudi
calls the method getInsertValue when the key exists before calling
combineAndGetUpdateValue
Regards
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org.apache.org
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org