[ https://issues.apache.org/jira/browse/HUDI-4879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Alexey Kudinkin updated HUDI-4879: ---------------------------------- Reviewers: Alexey Kudinkin > MERGE INTO fails when setting "hoodie.datasource.write.payload.class" > --------------------------------------------------------------------- > > Key: HUDI-4879 > URL: https://issues.apache.org/jira/browse/HUDI-4879 > Project: Apache Hudi > Issue Type: Bug > Reporter: Alexey Kudinkin > Assignee: Jian Feng > Priority: Blocker > Fix For: 0.12.1 > > > As reported by the user: > [https://github.com/apache/hudi/issues/6354] > > Currently, setting \{{hoodie.datasource.write.payload.class = > 'org.apache.hudi.common.model.DefaultHoodieRecordPayload' }}will result in > the following exception: > {code:java} > org.apache.hudi.exception.HoodieUpsertExceptio > n: Error upserting bucketType UPDATE for partition :0 at > org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:329) > at > org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:244) > at > org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102) > at > org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102) > at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:915) > at > org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:915) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) > at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:386) > at > org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1498) > at > org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1408) > at > org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1472) > at > org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1295) > at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:335) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) > at org.apache.spark.scheduler.Task.run(Task.scala:131) > at > org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.hudi.exception.HoodieException: > org.apache.hudi.exception.HoodieException: > java.util.concurrent.ExecutionException: > org.apache.hudi.exception.HoodieUpsertException: Failed to combine/merg > e new record with old value in storage, for new record > {HoodieRecord{key=HoodieKey { recordKey=id:1 partitionPath=}, > currentLocation='HoodieRecordLocation {instantTime=20220810095846644, > fileId=60c04f95-ca5e-4f82-9558-40da29cc022e-0}', > newLocation='HoodieRecordLocation {instantTime=20220810101719437, > fileId=60c04f95-ca5e-4f82-9558-40da29cc022e-0}'}}, old value > {{"_hoodie_commit_time": "20220810095824514", "_hoodie_commit_seqno": > "20220810095824514_0_0", "_hoodie_record_key": "id:1", > "_hoodie_partition_path": "", "_hoodie_file_name": > "60c04f95-ca5e-4f82-9558-40da29cc022e-0_0-937-24808_20220810095846644.parquet", > "id": 1, "name": "a0", "ts": 1000}} at > org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:149) > at > org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:358) > at > org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:349) > at > org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:322) > ... 28 more > Caused by: org.apache.hudi.exception.HoodieException: > java.util.concurrent.ExecutionException: > org.apache.hudi.exception.HoodieUpsertException: Failed to combine/merge new > record with old value in storage, for > new record {HoodieRecord{key=HoodieKey { recordKey=id:1 partitionPath=}, > currentLocation='HoodieRecordLocation {instantTime=20220810095846644, > fileId=60c04f95-ca5e-4f82-9558-40da29cc022e-0}', > newLocation='HoodieRecordLocation {instantTime=20220810101719437, > fileId=60c04f95-ca5e-4f82-9558-40da29cc022e-0}'}}, old value > {{"_hoodie_commit_time": "20220810095824514", "_hoodie_commit_seqno": > "20220810095824514_0_0", "_hoodie_record_key": "id:1", > "_hoodie_partition_path": "", "_hoodie_file_name": > "60c04f95-ca5e-4f82-9558-40da29cc022e-0_0-937-24808_20220810095846644.parquet", > "id": 1, "name": "a0", "ts": 1000}} at > org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:161) > at > org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:147) > ... 31 more > Caused by: java.util.concurrent.ExecutionException: > org.apache.hudi.exception.HoodieUpsertException: Failed to combine/merge new > record with old value in storage, for new record {HoodieRecord{key=HoodieKey > { r > ecordKey=id:1 partitionPath=}, currentLocation='HoodieRecordLocation > {instantTime=20220810095846644, > fileId=60c04f95-ca5e-4f82-9558-40da29cc022e-0}', > newLocation='HoodieRecordLocation {instantTime=20220810101719437, > fileId=60c04f95-ca5e-4f82-9558-40da29cc022e-0}'}}, old value > {{"_hoodie_commit_time": "20220810095824514", "_hoodie_commit_seqno": > "20220810095824514_0_0", "_hoodie_record_key": "id:1", > "_hoodie_partition_path": "", "_hoodie_file_name": > "60c04f95-ca5e-4f82-9558-40da29cc022e-0_0-937-24808_20220810095846644.parquet", > "id": 1, "name": "a0", "ts": 1000}} at > java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at > org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:155) > ... 32 more > Caused by: org.apache.hudi.exception.HoodieUpsertException: Failed to > combine/merge new record with old value in storage, for new record > {HoodieRecord{key=HoodieKey { recordKey=id:1 partitionPath=}, currentLoc > ation='HoodieRecordLocation {instantTime=20220810095846644, > fileId=60c04f95-ca5e-4f82-9558-40da29cc022e-0}', > newLocation='HoodieRecordLocation {instantTime=20220810101719437, > fileId=60c04f95-ca5e-4f82-9558-40da29cc022e-0}'}}, old value > {{"_hoodie_commit_time": "20220810095824514", "_hoodie_commit_seqno": > "20220810095824514_0_0", "_hoodie_record_key": "id:1", > "_hoodie_partition_path": "", "_hoodie_file_name": > "60c04f95-ca5e-4f82-9558-40da29cc022e-0_0-937-24808_20220810095846644.parquet", > "id": 1, "name": "a0", "ts": 1000}} at > org.apache.hudi.io.HoodieMergeHandle.write(HoodieMergeHandle.java:351) > at > org.apache.hudi.table.action.commit.BaseMergeHelper$UpdateHandler.consumeOneRecord(BaseMergeHelper.java:122) > at > org.apache.hudi.table.action.commit.BaseMergeHelper$UpdateHandler.consumeOneRecord(BaseMergeHelper.java:112) > at > org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:37) > at > org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:135) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > ... 3 more > Caused by: java.lang.ClassCastException: java.lang.Integer cannot be cast to > java.lang.Long > at java.lang.Long.compareTo(Long.java:54) > at > org.apache.hudi.common.model.DefaultHoodieRecordPayload.needUpdatingPersistedRecord(DefaultHoodieRecordPayload.java:139) > at > org.apache.hudi.common.model.DefaultHoodieRecordPayload.combineAndGetUpdateValue(DefaultHoodieRecordPayload.java:62) > at org.apache.hudi.io.HoodieMergeHandle.write(HoodieMergeHandle.java:332) > ... 8 more3071575 [task-result-getter-2] ERROR > org.apache.spark.scheduler.TaskSetManager - Task 0 in stage 996.0 failed 4 > times; aborting job > org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in > stage 996.0 failed 4 times, most recent failure: Lost task 0.3 in stage 996.0 > (TID 27262) (szzb-bg-uat-sdp-hadoop-05 executor 8): or > g.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType > UPDATE for partition :0 at > org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:329) > at > org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:244) > at > org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102) > at > org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102) > at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:915) > at > org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:915) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) > at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:386) > at > org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1498) > at > org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1408) > at > org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1472) > at > org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1295) > at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:335) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) > at org.apache.spark.scheduler.Task.run(Task.scala:131) > at > org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.hudi.exception.HoodieException: > org.apache.hudi.exception.HoodieException: > java.util.concurrent.ExecutionException: > org.apache.hudi.exception.HoodieUpsertException: Failed to combine/merg > e new record with old value in storage, for new record > {HoodieRecord{key=HoodieKey { recordKey=id:1 partitionPath=}, > currentLocation='HoodieRecordLocation {instantTime=20220810095846644, > fileId=60c04f95-ca5e-4f82-9558-40da29cc022e-0}', > newLocation='HoodieRecordLocation {instantTime=20220810101719437, > fileId=60c04f95-ca5e-4f82-9558-40da29cc022e-0}'}}, old value > {{"_hoodie_commit_time": "20220810095824514", "_hoodie_commit_seqno": > "20220810095824514_0_0", "_hoodie_record_key": "id:1", > "_hoodie_partition_path": "", "_hoodie_file_name": > "60c04f95-ca5e-4f82-9558-40da29cc022e-0_0-937-24808_20220810095846644.parquet", > "id": 1, "name": "a0", "ts": 1000}} at > org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:149) > at > org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:358) > at > org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:349) > at > org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:322) > ... 28 more {code} > This occurs b/c DefaultHoodieRecordPayload somehow overrides the setting of > ExpressionPayload that have to be used to appropriately handle MERGE INTO > statement. > > Steps to reproduce: > {code:java} > --step 1: create table > drop table hudi_cow_pk_cbfield_tbl; > create table hudi_cow_pk_cbfield_tbl ( > id bigint, > name string, > ts bigint > ) using hudi > tblproperties ( > type = 'cow', > primaryKey = 'id', > preCombineField = 'ts', > hoodie.datasource.write.hive_style_partitioning = false, > hoodie.datasource.write.operation = 'upsert', > hoodie.datasource.write.payload.class = > 'org.apache.hudi.common.model.DefaultHoodieRecordPayload' > ) > ; > --step 2: insert into a recored with primaryKey=1, preCombineField=1000 > insert into hudi_cow_pk_cbfield_tbl select 1, 'a0', 1000; > --step 3: 'insert' with same primaryKey, but change the preCombineField > value to the smaller value 100, the action not effect expectedly (note:is > normal for setting the param: hoodie.datasource.write.payload.class = > 'org.apache.hudi.common.model.DefaultHoodieRecordPayload') > insert into hudi_cow_pk_cbfield_tbl select 1, 'a0_new', 100; > select * from hudi_cow_pk_cbfield_tbl; > --step 4: 'update' action with same primaryKey, but change the > preCombineField value to the smaller value 20, the action not effect > expectedly (note:is normal for setting the param: > hoodie.datasource.write.payload.class = > 'org.apache.hudi.common.model.DefaultHoodieRecordPayload') > update hudi_cow_pk_cbfield_tbl set name='a1_new',ts=20 where id= 1; > select * from hudi_cow_pk_cbfield_tbl; > --step 5: 'merge into' action with same primaryKey, occure a error: > org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType > UPDATE for partition :0 > merge into hudi_cow_pk_cbfield_tbl as target > using (select 1 as id,'a1_merge' as name,2000 as ts) as source > on target.id = source.id > when matched then update set * > when not matched then insert * > ; {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)