[ https://issues.apache.org/jira/browse/HUDI-6500?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
weiming updated HUDI-6500: -------------------------- Attachment: image-2023-07-12-17-51-55-790.png > Using the RuntimeReplaceable function in the merge into matched condition > reports an error > ------------------------------------------------------------------------------------------ > > Key: HUDI-6500 > URL: https://issues.apache.org/jira/browse/HUDI-6500 > Project: Apache Hudi > Issue Type: Bug > Reporter: weiming > Assignee: weiming > Priority: Major > Attachments: image-2023-07-12-17-51-55-790.png > > > step1: > create table and write some data > create table test_table ( > id string, > name string, > age int, > addressarray<string>, > ts long, > dt string > ) using hudi > tblproperties ( > type = 'cow', > primaryKey = 'id', > preCombineField = 'ts' > ) > partitioned by (dt); > insert overwrite test_table partition(dt = '2023-03-01') select "01", > "zhangsan01", 17, array('bj','sh'), 1688376626321; > step2: use mergeinto with nvl or ifNull function > merge into test_table as target > using ( > select'01'as id, 'zhangsan01_new'asname , 18as age ,array('bj1','sh1','gz1') > asaddress, 1688376626322as ts,'2023-03-01'as dt > ) source > on target.id = source.id and target.dt = source.dt > whenmatchedthen > update set > target.name =source.name, > target.age =if(nvl(source.age,0)<> nvl(target.age,0),source.age,target.age), > target.address =source.address, > target.ts =source.ts > when not matched then insert * ; > > > error log: > > org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in > stage 26.0 failed 4 times, most recent failure: Lost task 0.3 in stage 26.0 > (TID 20) (bigdata-test.com executor 2): > org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType > UPDATE for partition :0 > at > org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:349) > at > org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:264) > at > org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102) > at > org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102) > at > org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:911) > at > org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:911) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:369) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:333) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:369) > at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:382) > at > org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1498) > at > org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1408) > at > org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1472) > at > org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1295) > at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:380) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:331) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:369) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:333) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) > at org.apache.spark.scheduler.Task.run(Task.scala:131) > at > org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1463) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.apache.hudi.exception.HoodieException: > org.apache.hudi.exception.HoodieException: > org.apache.hudi.exception.HoodieUpsertException: Failed to combine/merge new > record with old value in storage, for new record {HoodieRecord{key=HoodieKey > { recordKey=01 partitionPath=dt=2023-03-01}, > currentLocation='HoodieRecordLocation \{instantTime=20230705190339617, > fileId=56fda6a9-85e2-4f5b-b49a-1b4e924c5183-0}', > newLocation='HoodieRecordLocation \{instantTime=20230705191354955, > fileId=56fda6a9-85e2-4f5b-b49a-1b4e924c5183-0}'}}, old value > \{HoodieRecord{key=null, currentLocation='null', newLocation='null'}} > at > org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:156) > at > org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:385) > at > org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:376) > at > org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:342) > ... 28 more > Caused by: org.apache.hudi.exception.HoodieException: > org.apache.hudi.exception.HoodieUpsertException: Failed to combine/merge new > record with old value in storage, for new record \{HoodieRecord{key=HoodieKey > { recordKey=01 partitionPath=dt=2023-03-01} > , currentLocation='HoodieRecordLocation \{instantTime=20230705190339617, > fileId=56fda6a9-85e2-4f5b-b49a-1b4e924c5183-0}', > newLocation='HoodieRecordLocation \{instantTime=20230705191354955, > fileId=56fda6a9-85e2-4f5b-b49a-1b4e924c5183-0}'}}, old value > \{HoodieRecord{key=null, currentLocation='null', newLocation='null'}} > at > org.apache.hudi.common.util.queue.SimpleExecutor.execute(SimpleExecutor.java:73) > at > org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:154) > ... 31 more > Caused by: org.apache.hudi.exception.HoodieUpsertException: Failed to > combine/merge new record with old value in storage, for new record > {HoodieRecord{key=HoodieKey > { recordKey=01 partitionPath=dt=2023-03-01} > , currentLocation='HoodieRecordLocation \{instantTime=20230705190339617, > fileId=56fda6a9-85e2-4f5b-b49a-1b4e924c5183-0}', > newLocation='HoodieRecordLocation \{instantTime=20230705191354955, > fileId=56fda6a9-85e2-4f5b-b49a-1b4e924c5183-0}'}}, old value > \{HoodieRecord{key=null, currentLocation='null', newLocation='null'}} > at org.apache.hudi.io.HoodieMergeHandle.write(HoodieMergeHandle.java:358) > at > org.apache.hudi.table.action.commit.BaseMergeHelper$UpdateHandler.consume(BaseMergeHelper.java:54) > at > org.apache.hudi.table.action.commit.BaseMergeHelper$UpdateHandler.consume(BaseMergeHelper.java:44) > at > org.apache.hudi.common.util.queue.SimpleExecutor.execute(SimpleExecutor.java:67) > ... 32 more > Caused by: java.lang.UnsupportedOperationException: Cannot evaluate > expression: nvl(age#24, 0) > at > org.apache.spark.sql.errors.QueryExecutionErrors$.cannotEvaluateExpressionError(QueryExecutionErrors.scala:79) > at > org.apache.spark.sql.catalyst.expressions.Unevaluable.eval(Expression.scala:309) > at > org.apache.spark.sql.catalyst.expressions.Unevaluable.eval$(Expression.scala:308) > at > org.apache.spark.sql.catalyst.expressions.Nvl.eval(nullExpressions.scala:185) > at > org.apache.spark.sql.catalyst.expressions.ExpressionProxy.proxyEval(SubExprEvaluationRuntime.scala:132) > at > org.apache.spark.sql.catalyst.expressions.SubExprEvaluationRuntime$$anon$1.load(SubExprEvaluationRuntime.scala:46) > at > org.apache.spark.sql.catalyst.expressions.SubExprEvaluationRuntime$$anon$1.load(SubExprEvaluationRuntime.scala:44) > at > org.sparkproject.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599) > at > org.sparkproject.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379) > at > org.sparkproject.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342) > at > org.sparkproject.guava.cache.LocalCache$Segment.get(LocalCache.java:2257) > at org.sparkproject.guava.cache.LocalCache.get(LocalCache.java:4000) > at org.sparkproject.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004) > at > org.sparkproject.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874) > at > org.apache.spark.sql.catalyst.expressions.SubExprEvaluationRuntime.getEval(SubExprEvaluationRuntime.scala:53) > at > org.apache.spark.sql.catalyst.expressions.ExpressionProxy.eval(SubExprEvaluationRuntime.scala:134) > at > org.apache.spark.sql.catalyst.expressions.BinaryExpression.eval(Expression.scala:566) > at > org.apache.spark.sql.catalyst.expressions.UnaryExpression.eval(Expression.scala:477) > at > org.apache.spark.sql.catalyst.expressions.If.eval(conditionalExpressions.scala:68) > at > org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:168) > at > org.apache.spark.sql.catalyst.expressions.InterpretedSafeProjection.apply(InterpretedSafeProjection.scala:117) > at > org.apache.spark.sql.catalyst.expressions.InterpretedSafeProjection.apply(InterpretedSafeProjection.scala:32) > at > org.apache.spark.sql.hudi.command.payload.ExpressionPayload.$anonfun$processMatchedRecord$3(ExpressionPayload.scala:130) > at > org.apache.spark.sql.hudi.command.payload.ExpressionPayload.$anonfun$processMatchedRecord$3$adapted(ExpressionPayload.scala:120) > at > scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:985) > at scala.collection.Iterator.foreach(Iterator.scala:943) > at scala.collection.Iterator.foreach$(Iterator.scala:943) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) > at scala.collection.IterableLike.foreach(IterableLike.scala:74) > at scala.collection.IterableLike.foreach$(IterableLike.scala:73) > at scala.collection.AbstractIterable.foreach(Iterable.scala:56) > at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:984) > at > org.apache.spark.sql.hudi.command.payload.ExpressionPayload.processMatchedRecord(ExpressionPayload.scala:120) > at > org.apache.spark.sql.hudi.command.payload.ExpressionPayload.combineAndGetUpdateValue(ExpressionPayload.scala:88) > at > org.apache.hudi.common.model.HoodieAvroRecordMerger.combineAndGetUpdateValue(HoodieAvroRecordMerger.java:84) > at > org.apache.hudi.common.model.HoodieAvroRecordMerger.merge(HoodieAvroRecordMerger.java:60) > at org.apache.hudi.io.HoodieMergeHandle.write(HoodieMergeHandle.java:340) > ... 35 more -- This message was sent by Atlassian Jira (v8.20.10#820010)