[ 
https://issues.apache.org/jira/browse/HUDI-6500?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

weiming updated HUDI-6500:
--------------------------
    Attachment: image-2023-07-12-17-51-55-790.png

> Using the RuntimeReplaceable function in the merge into matched condition 
> reports an error
> ------------------------------------------------------------------------------------------
>
>                 Key: HUDI-6500
>                 URL: https://issues.apache.org/jira/browse/HUDI-6500
>             Project: Apache Hudi
>          Issue Type: Bug
>            Reporter: weiming
>            Assignee: weiming
>            Priority: Major
>         Attachments: image-2023-07-12-17-51-55-790.png
>
>
> step1:
> create table and write some data
> create table test_table (
> id string,
> name string,
> age int,
> addressarray<string>,
> ts long,
> dt string
> ) using hudi
> tblproperties (
> type = 'cow',
> primaryKey = 'id',
> preCombineField = 'ts'
> )
> partitioned by (dt);
> insert overwrite test_table partition(dt = '2023-03-01') select "01", 
> "zhangsan01", 17, array('bj','sh'), 1688376626321;
> step2: use mergeinto with nvl or ifNull function 
> merge into test_table as target
> using (
> select'01'as id, 'zhangsan01_new'asname , 18as age ,array('bj1','sh1','gz1') 
> asaddress, 1688376626322as ts,'2023-03-01'as dt
> ) source
> on target.id = source.id and target.dt = source.dt
> whenmatchedthen
> update set
> target.name =source.name,
> target.age =if(nvl(source.age,0)<> nvl(target.age,0),source.age,target.age),
> target.address =source.address,
> target.ts =source.ts
> when not matched then insert * ;
>  
>  
> error log:
>  
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
> stage 26.0 failed 4 times, most recent failure: Lost task 0.3 in stage 26.0 
> (TID 20) (bigdata-test.com executor 2): 
> org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType 
> UPDATE for partition :0
>     at 
> org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:349)
>     at 
> org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:264)
>     at 
> org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102)
>     at 
> org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102)
>     at 
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:911)
>     at 
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:911)
>     at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:369)
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:333)
>     at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:369)
>     at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:382)
>     at 
> org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1498)
>     at 
> org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1408)
>     at 
> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1472)
>     at 
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1295)
>     at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:380)
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
>     at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:369)
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:333)
>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>     at org.apache.spark.scheduler.Task.run(Task.scala:131)
>     at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
>     at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1463)
>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>     at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.hudi.exception.HoodieException: 
> org.apache.hudi.exception.HoodieException: 
> org.apache.hudi.exception.HoodieUpsertException: Failed to combine/merge new 
> record with old value in storage, for new record {HoodieRecord{key=HoodieKey
> { recordKey=01 partitionPath=dt=2023-03-01}, 
> currentLocation='HoodieRecordLocation \{instantTime=20230705190339617, 
> fileId=56fda6a9-85e2-4f5b-b49a-1b4e924c5183-0}', 
> newLocation='HoodieRecordLocation \{instantTime=20230705191354955, 
> fileId=56fda6a9-85e2-4f5b-b49a-1b4e924c5183-0}'}}, old value 
> \{HoodieRecord{key=null, currentLocation='null', newLocation='null'}}
>     at 
> org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:156)
>     at 
> org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:385)
>     at 
> org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:376)
>     at 
> org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:342)
>     ... 28 more
> Caused by: org.apache.hudi.exception.HoodieException: 
> org.apache.hudi.exception.HoodieUpsertException: Failed to combine/merge new 
> record with old value in storage, for new record \{HoodieRecord{key=HoodieKey 
> { recordKey=01 partitionPath=dt=2023-03-01}
> , currentLocation='HoodieRecordLocation \{instantTime=20230705190339617, 
> fileId=56fda6a9-85e2-4f5b-b49a-1b4e924c5183-0}', 
> newLocation='HoodieRecordLocation \{instantTime=20230705191354955, 
> fileId=56fda6a9-85e2-4f5b-b49a-1b4e924c5183-0}'}}, old value 
> \{HoodieRecord{key=null, currentLocation='null', newLocation='null'}}
>     at 
> org.apache.hudi.common.util.queue.SimpleExecutor.execute(SimpleExecutor.java:73)
>     at 
> org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:154)
>     ... 31 more
> Caused by: org.apache.hudi.exception.HoodieUpsertException: Failed to 
> combine/merge new record with old value in storage, for new record 
> {HoodieRecord{key=HoodieKey
> { recordKey=01 partitionPath=dt=2023-03-01}
> , currentLocation='HoodieRecordLocation \{instantTime=20230705190339617, 
> fileId=56fda6a9-85e2-4f5b-b49a-1b4e924c5183-0}', 
> newLocation='HoodieRecordLocation \{instantTime=20230705191354955, 
> fileId=56fda6a9-85e2-4f5b-b49a-1b4e924c5183-0}'}}, old value 
> \{HoodieRecord{key=null, currentLocation='null', newLocation='null'}}
>     at org.apache.hudi.io.HoodieMergeHandle.write(HoodieMergeHandle.java:358)
>     at 
> org.apache.hudi.table.action.commit.BaseMergeHelper$UpdateHandler.consume(BaseMergeHelper.java:54)
>     at 
> org.apache.hudi.table.action.commit.BaseMergeHelper$UpdateHandler.consume(BaseMergeHelper.java:44)
>     at 
> org.apache.hudi.common.util.queue.SimpleExecutor.execute(SimpleExecutor.java:67)
>     ... 32 more
> Caused by: java.lang.UnsupportedOperationException: Cannot evaluate 
> expression: nvl(age#24, 0)
>     at 
> org.apache.spark.sql.errors.QueryExecutionErrors$.cannotEvaluateExpressionError(QueryExecutionErrors.scala:79)
>     at 
> org.apache.spark.sql.catalyst.expressions.Unevaluable.eval(Expression.scala:309)
>     at 
> org.apache.spark.sql.catalyst.expressions.Unevaluable.eval$(Expression.scala:308)
>     at 
> org.apache.spark.sql.catalyst.expressions.Nvl.eval(nullExpressions.scala:185)
>     at 
> org.apache.spark.sql.catalyst.expressions.ExpressionProxy.proxyEval(SubExprEvaluationRuntime.scala:132)
>     at 
> org.apache.spark.sql.catalyst.expressions.SubExprEvaluationRuntime$$anon$1.load(SubExprEvaluationRuntime.scala:46)
>     at 
> org.apache.spark.sql.catalyst.expressions.SubExprEvaluationRuntime$$anon$1.load(SubExprEvaluationRuntime.scala:44)
>     at 
> org.sparkproject.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
>     at 
> org.sparkproject.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
>     at 
> org.sparkproject.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
>     at 
> org.sparkproject.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
>     at org.sparkproject.guava.cache.LocalCache.get(LocalCache.java:4000)
>     at org.sparkproject.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004)
>     at 
> org.sparkproject.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874)
>     at 
> org.apache.spark.sql.catalyst.expressions.SubExprEvaluationRuntime.getEval(SubExprEvaluationRuntime.scala:53)
>     at 
> org.apache.spark.sql.catalyst.expressions.ExpressionProxy.eval(SubExprEvaluationRuntime.scala:134)
>     at 
> org.apache.spark.sql.catalyst.expressions.BinaryExpression.eval(Expression.scala:566)
>     at 
> org.apache.spark.sql.catalyst.expressions.UnaryExpression.eval(Expression.scala:477)
>     at 
> org.apache.spark.sql.catalyst.expressions.If.eval(conditionalExpressions.scala:68)
>     at 
> org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:168)
>     at 
> org.apache.spark.sql.catalyst.expressions.InterpretedSafeProjection.apply(InterpretedSafeProjection.scala:117)
>     at 
> org.apache.spark.sql.catalyst.expressions.InterpretedSafeProjection.apply(InterpretedSafeProjection.scala:32)
>     at 
> org.apache.spark.sql.hudi.command.payload.ExpressionPayload.$anonfun$processMatchedRecord$3(ExpressionPayload.scala:130)
>     at 
> org.apache.spark.sql.hudi.command.payload.ExpressionPayload.$anonfun$processMatchedRecord$3$adapted(ExpressionPayload.scala:120)
>     at 
> scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:985)
>     at scala.collection.Iterator.foreach(Iterator.scala:943)
>     at scala.collection.Iterator.foreach$(Iterator.scala:943)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>     at scala.collection.IterableLike.foreach(IterableLike.scala:74)
>     at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
>     at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:984)
>     at 
> org.apache.spark.sql.hudi.command.payload.ExpressionPayload.processMatchedRecord(ExpressionPayload.scala:120)
>     at 
> org.apache.spark.sql.hudi.command.payload.ExpressionPayload.combineAndGetUpdateValue(ExpressionPayload.scala:88)
>     at 
> org.apache.hudi.common.model.HoodieAvroRecordMerger.combineAndGetUpdateValue(HoodieAvroRecordMerger.java:84)
>     at 
> org.apache.hudi.common.model.HoodieAvroRecordMerger.merge(HoodieAvroRecordMerger.java:60)
>     at org.apache.hudi.io.HoodieMergeHandle.write(HoodieMergeHandle.java:340)
>     ... 35 more



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to