[ https://issues.apache.org/jira/browse/HUDI-4879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Alexey Kudinkin updated HUDI-4879: ---------------------------------- Description: As reported by the user: [https://github.com/apache/hudi/issues/6354] Currently, setting {{hoodie.datasource.write.payload.class = 'org.apache.hudi.common.model.DefaultHoodieRecordPayload' }}will result in the following exception: {code:java} org.apache.hudi.exception.HoodieUpsertExceptio n: Error upserting bucketType UPDATE for partition :0 at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:329) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:244) at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102) at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:915) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:915) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:386) at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1498) at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1408) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1472) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1295) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384) at org.apache.spark.rdd.RDD.iterator(RDD.scala:335) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.hudi.exception.HoodieException: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieUpsertException: Failed to combine/merg e new record with old value in storage, for new record {HoodieRecord{key=HoodieKey { recordKey=id:1 partitionPath=}, currentLocation='HoodieRecordLocation {instantTime=20220810095846644, fileId=60c04f95-ca5e-4f82-9558-40da29cc022e-0}', newLocation='HoodieRecordLocation {instantTime=20220810101719437, fileId=60c04f95-ca5e-4f82-9558-40da29cc022e-0}'}}, old value {{"_hoodie_commit_time": "20220810095824514", "_hoodie_commit_seqno": "20220810095824514_0_0", "_hoodie_record_key": "id:1", "_hoodie_partition_path": "", "_hoodie_file_name": "60c04f95-ca5e-4f82-9558-40da29cc022e-0_0-937-24808_20220810095846644.parquet", "id": 1, "name": "a0", "ts": 1000}} at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:149) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:358) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:349) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:322) ... 28 more Caused by: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieUpsertException: Failed to combine/merge new record with old value in storage, for new record {HoodieRecord{key=HoodieKey { recordKey=id:1 partitionPath=}, currentLocation='HoodieRecordLocation {instantTime=20220810095846644, fileId=60c04f95-ca5e-4f82-9558-40da29cc022e-0}', newLocation='HoodieRecordLocation {instantTime=20220810101719437, fileId=60c04f95-ca5e-4f82-9558-40da29cc022e-0}'}}, old value {{"_hoodie_commit_time": "20220810095824514", "_hoodie_commit_seqno": "20220810095824514_0_0", "_hoodie_record_key": "id:1", "_hoodie_partition_path": "", "_hoodie_file_name": "60c04f95-ca5e-4f82-9558-40da29cc022e-0_0-937-24808_20220810095846644.parquet", "id": 1, "name": "a0", "ts": 1000}} at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:161) at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:147) ... 31 more Caused by: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieUpsertException: Failed to combine/merge new record with old value in storage, for new record {HoodieRecord{key=HoodieKey { r ecordKey=id:1 partitionPath=}, currentLocation='HoodieRecordLocation {instantTime=20220810095846644, fileId=60c04f95-ca5e-4f82-9558-40da29cc022e-0}', newLocation='HoodieRecordLocation {instantTime=20220810101719437, fileId=60c04f95-ca5e-4f82-9558-40da29cc022e-0}'}}, old value {{"_hoodie_commit_time": "20220810095824514", "_hoodie_commit_seqno": "20220810095824514_0_0", "_hoodie_record_key": "id:1", "_hoodie_partition_path": "", "_hoodie_file_name": "60c04f95-ca5e-4f82-9558-40da29cc022e-0_0-937-24808_20220810095846644.parquet", "id": 1, "name": "a0", "ts": 1000}} at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:155) ... 32 more Caused by: org.apache.hudi.exception.HoodieUpsertException: Failed to combine/merge new record with old value in storage, for new record {HoodieRecord{key=HoodieKey { recordKey=id:1 partitionPath=}, currentLoc ation='HoodieRecordLocation {instantTime=20220810095846644, fileId=60c04f95-ca5e-4f82-9558-40da29cc022e-0}', newLocation='HoodieRecordLocation {instantTime=20220810101719437, fileId=60c04f95-ca5e-4f82-9558-40da29cc022e-0}'}}, old value {{"_hoodie_commit_time": "20220810095824514", "_hoodie_commit_seqno": "20220810095824514_0_0", "_hoodie_record_key": "id:1", "_hoodie_partition_path": "", "_hoodie_file_name": "60c04f95-ca5e-4f82-9558-40da29cc022e-0_0-937-24808_20220810095846644.parquet", "id": 1, "name": "a0", "ts": 1000}} at org.apache.hudi.io.HoodieMergeHandle.write(HoodieMergeHandle.java:351) at org.apache.hudi.table.action.commit.BaseMergeHelper$UpdateHandler.consumeOneRecord(BaseMergeHelper.java:122) at org.apache.hudi.table.action.commit.BaseMergeHelper$UpdateHandler.consumeOneRecord(BaseMergeHelper.java:112) at org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:37) at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:135) at java.util.concurrent.FutureTask.run(FutureTask.java:266) ... 3 more Caused by: java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Long at java.lang.Long.compareTo(Long.java:54) at org.apache.hudi.common.model.DefaultHoodieRecordPayload.needUpdatingPersistedRecord(DefaultHoodieRecordPayload.java:139) at org.apache.hudi.common.model.DefaultHoodieRecordPayload.combineAndGetUpdateValue(DefaultHoodieRecordPayload.java:62) at org.apache.hudi.io.HoodieMergeHandle.write(HoodieMergeHandle.java:332) ... 8 more3071575 [task-result-getter-2] ERROR org.apache.spark.scheduler.TaskSetManager - Task 0 in stage 996.0 failed 4 times; aborting job org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 996.0 failed 4 times, most recent failure: Lost task 0.3 in stage 996.0 (TID 27262) (szzb-bg-uat-sdp-hadoop-05 executor 8): or g.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition :0 at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:329) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:244) at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102) at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:915) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:915) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:386) at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1498) at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1408) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1472) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1295) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384) at org.apache.spark.rdd.RDD.iterator(RDD.scala:335) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.hudi.exception.HoodieException: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieUpsertException: Failed to combine/merg e new record with old value in storage, for new record {HoodieRecord{key=HoodieKey { recordKey=id:1 partitionPath=}, currentLocation='HoodieRecordLocation {instantTime=20220810095846644, fileId=60c04f95-ca5e-4f82-9558-40da29cc022e-0}', newLocation='HoodieRecordLocation {instantTime=20220810101719437, fileId=60c04f95-ca5e-4f82-9558-40da29cc022e-0}'}}, old value {{"_hoodie_commit_time": "20220810095824514", "_hoodie_commit_seqno": "20220810095824514_0_0", "_hoodie_record_key": "id:1", "_hoodie_partition_path": "", "_hoodie_file_name": "60c04f95-ca5e-4f82-9558-40da29cc022e-0_0-937-24808_20220810095846644.parquet", "id": 1, "name": "a0", "ts": 1000}} at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:149) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:358) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:349) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:322) ... 28 more {code} Steps to reproduce: {code:java} --step 1: create table drop table hudi_cow_pk_cbfield_tbl; create table hudi_cow_pk_cbfield_tbl ( id bigint, name string, ts bigint ) using hudi tblproperties ( type = 'cow', primaryKey = 'id', preCombineField = 'ts', hoodie.datasource.write.hive_style_partitioning = false, hoodie.datasource.write.operation = 'upsert', hoodie.datasource.write.payload.class = 'org.apache.hudi.common.model.DefaultHoodieRecordPayload' ) ; --step 2: insert into a recored with primaryKey=1, preCombineField=1000 insert into hudi_cow_pk_cbfield_tbl select 1, 'a0', 1000; --step 3: 'insert' with same primaryKey, but change the preCombineField value to the smaller value 100, the action not effect expectedly (note:is normal for setting the param: hoodie.datasource.write.payload.class = 'org.apache.hudi.common.model.DefaultHoodieRecordPayload') insert into hudi_cow_pk_cbfield_tbl select 1, 'a0_new', 100; select * from hudi_cow_pk_cbfield_tbl; --step 4: 'update' action with same primaryKey, but change the preCombineField value to the smaller value 20, the action not effect expectedly (note:is normal for setting the param: hoodie.datasource.write.payload.class = 'org.apache.hudi.common.model.DefaultHoodieRecordPayload') update hudi_cow_pk_cbfield_tbl set name='a1_new',ts=20 where id= 1; select * from hudi_cow_pk_cbfield_tbl; --step 5: 'merge into' action with same primaryKey, occure a error: org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition :0 merge into hudi_cow_pk_cbfield_tbl as target using (select 1 as id,'a1_merge' as name,2000 as ts) as source on target.id = source.id when matched then update set * when not matched then insert * ; {code} was: As reported by the user: [https://github.com/apache/hudi/issues/6354] Currently, setting {{hoodie.datasource.write.payload.class = 'org.apache.hudi.common.model.DefaultHoodieRecordPayload'}} > MERGE INTO fails when setting "hoodie.datasource.write.payload.class" > --------------------------------------------------------------------- > > Key: HUDI-4879 > URL: https://issues.apache.org/jira/browse/HUDI-4879 > Project: Apache Hudi > Issue Type: Bug > Reporter: Alexey Kudinkin > Assignee: Alexey Kudinkin > Priority: Blocker > Fix For: 0.12.1 > > > As reported by the user: > [https://github.com/apache/hudi/issues/6354] > > Currently, setting {{hoodie.datasource.write.payload.class = > 'org.apache.hudi.common.model.DefaultHoodieRecordPayload' }}will result in > the following exception: > {code:java} > org.apache.hudi.exception.HoodieUpsertExceptio > n: Error upserting bucketType UPDATE for partition :0 at > org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:329) > at > org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:244) > at > org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102) > at > org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102) > at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:915) > at > org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:915) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) > at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:386) > at > org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1498) > at > org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1408) > at > org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1472) > at > org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1295) > at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:335) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) > at org.apache.spark.scheduler.Task.run(Task.scala:131) > at > org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.hudi.exception.HoodieException: > org.apache.hudi.exception.HoodieException: > java.util.concurrent.ExecutionException: > org.apache.hudi.exception.HoodieUpsertException: Failed to combine/merg > e new record with old value in storage, for new record > {HoodieRecord{key=HoodieKey { recordKey=id:1 partitionPath=}, > currentLocation='HoodieRecordLocation {instantTime=20220810095846644, > fileId=60c04f95-ca5e-4f82-9558-40da29cc022e-0}', > newLocation='HoodieRecordLocation {instantTime=20220810101719437, > fileId=60c04f95-ca5e-4f82-9558-40da29cc022e-0}'}}, old value > {{"_hoodie_commit_time": "20220810095824514", "_hoodie_commit_seqno": > "20220810095824514_0_0", "_hoodie_record_key": "id:1", > "_hoodie_partition_path": "", "_hoodie_file_name": > "60c04f95-ca5e-4f82-9558-40da29cc022e-0_0-937-24808_20220810095846644.parquet", > "id": 1, "name": "a0", "ts": 1000}} at > org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:149) > at > org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:358) > at > org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:349) > at > org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:322) > ... 28 more > Caused by: org.apache.hudi.exception.HoodieException: > java.util.concurrent.ExecutionException: > org.apache.hudi.exception.HoodieUpsertException: Failed to combine/merge new > record with old value in storage, for > new record {HoodieRecord{key=HoodieKey { recordKey=id:1 partitionPath=}, > currentLocation='HoodieRecordLocation {instantTime=20220810095846644, > fileId=60c04f95-ca5e-4f82-9558-40da29cc022e-0}', > newLocation='HoodieRecordLocation {instantTime=20220810101719437, > fileId=60c04f95-ca5e-4f82-9558-40da29cc022e-0}'}}, old value > {{"_hoodie_commit_time": "20220810095824514", "_hoodie_commit_seqno": > "20220810095824514_0_0", "_hoodie_record_key": "id:1", > "_hoodie_partition_path": "", "_hoodie_file_name": > "60c04f95-ca5e-4f82-9558-40da29cc022e-0_0-937-24808_20220810095846644.parquet", > "id": 1, "name": "a0", "ts": 1000}} at > org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:161) > at > org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:147) > ... 31 more > Caused by: java.util.concurrent.ExecutionException: > org.apache.hudi.exception.HoodieUpsertException: Failed to combine/merge new > record with old value in storage, for new record {HoodieRecord{key=HoodieKey > { r > ecordKey=id:1 partitionPath=}, currentLocation='HoodieRecordLocation > {instantTime=20220810095846644, > fileId=60c04f95-ca5e-4f82-9558-40da29cc022e-0}', > newLocation='HoodieRecordLocation {instantTime=20220810101719437, > fileId=60c04f95-ca5e-4f82-9558-40da29cc022e-0}'}}, old value > {{"_hoodie_commit_time": "20220810095824514", "_hoodie_commit_seqno": > "20220810095824514_0_0", "_hoodie_record_key": "id:1", > "_hoodie_partition_path": "", "_hoodie_file_name": > "60c04f95-ca5e-4f82-9558-40da29cc022e-0_0-937-24808_20220810095846644.parquet", > "id": 1, "name": "a0", "ts": 1000}} at > java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at > org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:155) > ... 32 more > Caused by: org.apache.hudi.exception.HoodieUpsertException: Failed to > combine/merge new record with old value in storage, for new record > {HoodieRecord{key=HoodieKey { recordKey=id:1 partitionPath=}, currentLoc > ation='HoodieRecordLocation {instantTime=20220810095846644, > fileId=60c04f95-ca5e-4f82-9558-40da29cc022e-0}', > newLocation='HoodieRecordLocation {instantTime=20220810101719437, > fileId=60c04f95-ca5e-4f82-9558-40da29cc022e-0}'}}, old value > {{"_hoodie_commit_time": "20220810095824514", "_hoodie_commit_seqno": > "20220810095824514_0_0", "_hoodie_record_key": "id:1", > "_hoodie_partition_path": "", "_hoodie_file_name": > "60c04f95-ca5e-4f82-9558-40da29cc022e-0_0-937-24808_20220810095846644.parquet", > "id": 1, "name": "a0", "ts": 1000}} at > org.apache.hudi.io.HoodieMergeHandle.write(HoodieMergeHandle.java:351) > at > org.apache.hudi.table.action.commit.BaseMergeHelper$UpdateHandler.consumeOneRecord(BaseMergeHelper.java:122) > at > org.apache.hudi.table.action.commit.BaseMergeHelper$UpdateHandler.consumeOneRecord(BaseMergeHelper.java:112) > at > org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:37) > at > org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:135) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > ... 3 more > Caused by: java.lang.ClassCastException: java.lang.Integer cannot be cast to > java.lang.Long > at java.lang.Long.compareTo(Long.java:54) > at > org.apache.hudi.common.model.DefaultHoodieRecordPayload.needUpdatingPersistedRecord(DefaultHoodieRecordPayload.java:139) > at > org.apache.hudi.common.model.DefaultHoodieRecordPayload.combineAndGetUpdateValue(DefaultHoodieRecordPayload.java:62) > at org.apache.hudi.io.HoodieMergeHandle.write(HoodieMergeHandle.java:332) > ... 8 more3071575 [task-result-getter-2] ERROR > org.apache.spark.scheduler.TaskSetManager - Task 0 in stage 996.0 failed 4 > times; aborting job > org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in > stage 996.0 failed 4 times, most recent failure: Lost task 0.3 in stage 996.0 > (TID 27262) (szzb-bg-uat-sdp-hadoop-05 executor 8): or > g.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType > UPDATE for partition :0 at > org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:329) > at > org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:244) > at > org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102) > at > org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102) > at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:915) > at > org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:915) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) > at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:386) > at > org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1498) > at > org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1408) > at > org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1472) > at > org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1295) > at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:335) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) > at org.apache.spark.scheduler.Task.run(Task.scala:131) > at > org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.hudi.exception.HoodieException: > org.apache.hudi.exception.HoodieException: > java.util.concurrent.ExecutionException: > org.apache.hudi.exception.HoodieUpsertException: Failed to combine/merg > e new record with old value in storage, for new record > {HoodieRecord{key=HoodieKey { recordKey=id:1 partitionPath=}, > currentLocation='HoodieRecordLocation {instantTime=20220810095846644, > fileId=60c04f95-ca5e-4f82-9558-40da29cc022e-0}', > newLocation='HoodieRecordLocation {instantTime=20220810101719437, > fileId=60c04f95-ca5e-4f82-9558-40da29cc022e-0}'}}, old value > {{"_hoodie_commit_time": "20220810095824514", "_hoodie_commit_seqno": > "20220810095824514_0_0", "_hoodie_record_key": "id:1", > "_hoodie_partition_path": "", "_hoodie_file_name": > "60c04f95-ca5e-4f82-9558-40da29cc022e-0_0-937-24808_20220810095846644.parquet", > "id": 1, "name": "a0", "ts": 1000}} at > org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:149) > at > org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:358) > at > org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:349) > at > org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:322) > ... 28 more {code} > Steps to reproduce: > {code:java} > --step 1: create table > drop table hudi_cow_pk_cbfield_tbl; > create table hudi_cow_pk_cbfield_tbl ( > id bigint, > name string, > ts bigint > ) using hudi > tblproperties ( > type = 'cow', > primaryKey = 'id', > preCombineField = 'ts', > hoodie.datasource.write.hive_style_partitioning = false, > hoodie.datasource.write.operation = 'upsert', > hoodie.datasource.write.payload.class = > 'org.apache.hudi.common.model.DefaultHoodieRecordPayload' > ) > ; > --step 2: insert into a recored with primaryKey=1, preCombineField=1000 > insert into hudi_cow_pk_cbfield_tbl select 1, 'a0', 1000; > --step 3: 'insert' with same primaryKey, but change the preCombineField > value to the smaller value 100, the action not effect expectedly (note:is > normal for setting the param: hoodie.datasource.write.payload.class = > 'org.apache.hudi.common.model.DefaultHoodieRecordPayload') > insert into hudi_cow_pk_cbfield_tbl select 1, 'a0_new', 100; > select * from hudi_cow_pk_cbfield_tbl; > --step 4: 'update' action with same primaryKey, but change the > preCombineField value to the smaller value 20, the action not effect > expectedly (note:is normal for setting the param: > hoodie.datasource.write.payload.class = > 'org.apache.hudi.common.model.DefaultHoodieRecordPayload') > update hudi_cow_pk_cbfield_tbl set name='a1_new',ts=20 where id= 1; > select * from hudi_cow_pk_cbfield_tbl; > --step 5: 'merge into' action with same primaryKey, occure a error: > org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType > UPDATE for partition :0 > merge into hudi_cow_pk_cbfield_tbl as target > using (select 1 as id,'a1_merge' as name,2000 as ts) as source > on target.id = source.id > when matched then update set * > when not matched then insert * > ; {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)