[ 
https://issues.apache.org/jira/browse/HUDI-7522?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated HUDI-7522:
---------------------------------
    Labels: pull-request-available  (was: )

> Delete the bucket index partition when bucket id multiple to confirm next 
> write success
> ---------------------------------------------------------------------------------------
>
>                 Key: HUDI-7522
>                 URL: https://issues.apache.org/jira/browse/HUDI-7522
>             Project: Apache Hudi
>          Issue Type: Improvement
>          Components: writer-core
>            Reporter: xy
>            Priority: Major
>              Labels: pull-request-available
>
> Delete the bucket index partition when bucket id multiple to confirm next 
> write success:
> related to the issue: https://github.com/apache/hudi/issues/7216
>  
> at 
> org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:64)org.apache.hudi.table.action.deltacommit.SparkUpsertDeltaCommitActionExecutor.execute(SparkUpsertDeltaCommitActionExecutor.java:46)
> at 
> org.apache.hudi.table.HoodieSparkMergeOnReadTable.upsert(HoodieSparkMergeOnReadTable.java:89)
> at 
> org.apache.hudi.table.HoodieSparkMergeOnReadTable.upsert(HoodieSparkMergeOnReadTable.java:76)
> at 
> org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:155)
> at org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:213)
> at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:306)
> at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:171)
> at 
> org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
> at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
> at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
> at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
> at 
> org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:119)
> at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
> at 
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
> at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
> at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
> at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
> at 
> org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:119)
> at 
> org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:115)
> at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
> at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
> at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
> at 
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
> at 
> org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
> at 
> org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
> at 
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
> at 
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
> at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
> at 
> org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:115)
> at 
> org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:93)
> at 
> org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:91)
> at 
> org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:137)
> at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:848)
> at 
> org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:382)
> at 
> org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:355)
> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
> at 
> com.ctrip.muise.spark.scala.cdc.BinlogToHudiTaskExecutor$.saveByIdMod(BinlogToHudiTaskExecutor.scala:494)
> at 
> com.ctrip.muise.spark.scala.cdc.BinlogToHudiTaskExecutor$.saveCDC(BinlogToHudiTaskExecutor.scala:355)
> at 
> com.ctrip.muise.spark.scala.cdc.BinlogToHudiTaskExecutor$.saveToHudi(BinlogToHudiTaskExecutor.scala:59)
> at com.ctrip.muise.spark.scala.cdc.model.CdcTask.innerRun(CdcTask.scala:217)
> at com.ctrip.muise.spark.scala.cdc.model.CdcTask.run(CdcTask.scala:50)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
> Task 9 in stage 10640.0 failed 4 times, most recent failure: Lost task 9.3 in 
> stage 10640.0 (TID 498184) (svr34044hw2288.hadoop.sh5.ctripcorp.com executor 
> 1): java.lang.RuntimeException: org.apache.hudi.exception.HoodieIOException: 
> Find multiple files at partition 
> path=_db=cffinesigndb/_tbl=electronic_contract_162/_id_mod=0 belongs to the 
> same bucket id = 0
>  
> at 
> org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:121)
> at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:46)
> at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
> at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
> at 
> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:199)
> at 
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
> at 
> org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
> at org.apache.spark.scheduler.Task.run(Task.scala:137)
> at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1510)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.hudi.exception.HoodieIOException: Find multiple files 
> at partition path=_db=cffinesigndb/_tbl=electronic_contract_162/_id_mod=0 
> belongs to the same bucket id = 0
> at 
> org.apache.hudi.index.bucket.HoodieBucketIndex.lambda$loadPartitionBucketIdFileIdMapping$0(HoodieBucketIndex.java:122)
> at java.util.ArrayList.forEach(ArrayList.java:1249)
> at 
> org.apache.hudi.index.bucket.HoodieBucketIndex.loadPartitionBucketIdFileIdMapping(HoodieBucketIndex.java:114)
> at 
> org.apache.hudi.index.bucket.HoodieBucketIndex.access$200(HoodieBucketIndex.java:46)
> at 
> org.apache.hudi.index.bucket.HoodieBucketIndex$1.computeNext(HoodieBucketIndex.java:87)
> at 
> org.apache.hudi.index.bucket.HoodieBucketIndex$1.computeNext(HoodieBucketIndex.java:74)
> at 
> org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:119)
> ... 15 more
>  
> Driver stacktrace:
> at 
> org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2459)
> at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2408)
> at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2407)
> at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
> at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
> at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2407)
>  
> need delete the parition next write would success, when occur bucketid 
> multiple
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to