[ 
https://issues.apache.org/jira/browse/HUDI-5270?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yue Zhang updated HUDI-5270:
----------------------------
    Fix Version/s: 0.13.1
                       (was: 0.14.0)

> Duplicate key error when insert_overwrite same partition in multi writer
> ------------------------------------------------------------------------
>
>                 Key: HUDI-5270
>                 URL: https://issues.apache.org/jira/browse/HUDI-5270
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: multi-writer, spark-sql
>    Affects Versions: 0.11.0
>            Reporter: weiming
>            Assignee: weiming
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 0.13.1
>
>
> If the occ is enabled for hudi spark table, multiple threads insert_overwrite 
> the same partition. The data of the later task should overwrite the data of 
> the previous task. However, an error occurs.
> {code:java}
> // execute sql insert overwrite same partition
> ##THREAD-1 EXECUTE SQL
> insert overwrite table hudi_test_wm1_mor_02 partition (dt = '2021-12-14',hh = 
> '6') select id,name,price,ts from hudi_test_wm1_mor_01 where dt='2021-12-11' 
> and hh ='2';
> ##THREAD-2 EXECUTE SQL
> insert overwrite table hudi_test_wm1_mor_02 partition (dt = '2021-12-14',hh = 
> '6') select id,name,price,ts from hudi_test_wm1_mor_01 where dt='2021-12-11' 
> and hh ='4'; {code}
> {code:java}
> // ERROR LOG
> 22/11/07 15:24:53 ERROR SparkSQLDriver: Failed in [insert overwrite table 
> hudi_test_wm1_mor_02 partition (dt = '2021-12-14',hh = '6') select 
> id,name,price,ts from hudi_test_wm1_mor_01 where dt='2021-12-11' and hh 
> ='4']java.lang.IllegalStateException: Duplicate key 
> [20221107152403967__replacecommit__COMPLETED]    at 
> java.util.stream.Collectors.lambda$throwingMerger$0(Collectors.java:133)    
> at java.util.HashMap.merge(HashMap.java:1245)    at 
> java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1320)    at 
> java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)    at 
> java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)    at 
> java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)    at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)    
> at 
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)
>     at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)  
>   at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)  
>   at 
> java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151) 
>    at 
> java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
>     at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)  
>   at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)   
>  at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:270) 
>    at java.util.Iterator.forEachRemaining(Iterator.java:116)    at 
> java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
>     at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)  
>   at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)  
>   at 
> java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151) 
>    at 
> java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
>     at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)  
>   at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)   
>  at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:270) 
>    at 
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)
>     at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)  
>   at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)  
>   at 
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)    
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)    
> at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)    
> at 
> org.apache.hudi.common.table.view.AbstractTableFileSystemView.resetFileGroupsReplaced(AbstractTableFileSystemView.java:244)
>     at 
> org.apache.hudi.common.table.view.AbstractTableFileSystemView.init(AbstractTableFileSystemView.java:108)
>     at 
> org.apache.hudi.common.table.view.HoodieTableFileSystemView.init(HoodieTableFileSystemView.java:108)
>     at 
> org.apache.hudi.common.table.view.HoodieTableFileSystemView.<init>(HoodieTableFileSystemView.java:102)
>     at 
> org.apache.hudi.common.table.view.HoodieTableFileSystemView.<init>(HoodieTableFileSystemView.java:93)
>     at 
> org.apache.hudi.metadata.HoodieMetadataFileSystemView.<init>(HoodieMetadataFileSystemView.java:44)
>     at 
> org.apache.hudi.common.table.view.FileSystemViewManager.createInMemoryFileSystemView(FileSystemViewManager.java:166)
>     at 
> org.apache.hudi.common.table.view.FileSystemViewManager.lambda$createViewManager$5fcdabfe$1(FileSystemViewManager.java:259)
>     at 
> org.apache.hudi.common.table.view.FileSystemViewManager.lambda$getFileSystemView$1(FileSystemViewManager.java:111)
>     at 
> java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660)
>     at 
> org.apache.hudi.common.table.view.FileSystemViewManager.getFileSystemView(FileSystemViewManager.java:110)
>     at org.apache.hudi.table.HoodieTable.getHoodieView(HoodieTable.java:310)  
>   at org.apache.hudi.table.HoodieSparkTable.create(HoodieSparkTable.java:92)  
>   at org.apache.hudi.table.HoodieSparkTable.create(HoodieSparkTable.java:67)  
>   at 
> org.apache.hudi.client.SparkRDDWriteClient.createTable(SparkRDDWriteClient.java:129)
>     at 
> org.apache.hudi.client.BaseHoodieWriteClient.scheduleTableServiceInternal(BaseHoodieWriteClient.java:1353)
>     at 
> org.apache.hudi.client.BaseHoodieWriteClient.clean(BaseHoodieWriteClient.java:865)
>     at 
> org.apache.hudi.client.BaseHoodieWriteClient.clean(BaseHoodieWriteClient.java:838)
>     at 
> org.apache.hudi.client.BaseHoodieWriteClient.clean(BaseHoodieWriteClient.java:892)
>     at 
> org.apache.hudi.client.BaseHoodieWriteClient.autoCleanOnCommit(BaseHoodieWriteClient.java:615)
>     at 
> org.apache.hudi.client.BaseHoodieWriteClient.postCommit(BaseHoodieWriteClient.java:534)
>     at 
> org.apache.hudi.client.BaseHoodieWriteClient.commitStats(BaseHoodieWriteClient.java:237)
>     at 
> org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:122)
>     at 
> org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:656)
>     at 
> org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:319)   
>  at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:165)    
> at 
> org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
>     at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
>     at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
>     at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
>     at 
> org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:110)
>     at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
>     at 
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
>     at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
>     at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)   
>  at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
>     at 
> org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:110)
>     at 
> org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:106)
>     at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
>     at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
>     at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
>     at 
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
>     at 
> org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
>     at 
> org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
>     at 
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
>     at 
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
>     at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
>     at 
> org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:106)
>     at 
> org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:93)
>     at 
> org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:91)
>     at 
> org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:128)
>     at 
> org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:848)    
> at 
> org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:382)
>     at 
> org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:355)  
>   at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247)    
> at 
> org.apache.spark.sql.hudi.catalog.HoodieV1WriteBuilder$$anon$1$$anon$2.insert(HoodieInternalV2Table.scala:116)
>     at 
> org.apache.spark.sql.execution.datasources.v2.SupportsV1Write.writeWithV1(V1FallbackWriters.scala:79)
>     at 
> org.apache.spark.sql.execution.datasources.v2.SupportsV1Write.writeWithV1$(V1FallbackWriters.scala:78)
>     at 
> org.apache.spark.sql.execution.datasources.v2.OverwriteByExpressionExecV1.writeWithV1(V1FallbackWriters.scala:51)
>     at 
> org.apache.spark.sql.execution.datasources.v2.V1FallbackWriters.run(V1FallbackWriters.scala:66)
>     at 
> org.apache.spark.sql.execution.datasources.v2.V1FallbackWriters.run$(V1FallbackWriters.scala:65)
>     at 
> org.apache.spark.sql.execution.datasources.v2.OverwriteByExpressionExecV1.run(V1FallbackWriters.scala:51)
>     at 
> org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
>     at 
> org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
>     at 
> org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
>     at 
> org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:110)
>     at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
>     at 
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
>     at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
>     at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)   
>  at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
>     at 
> org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:110)
>     at 
> org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:106)
>     at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
>     at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
>     at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
>     at 
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
>     at 
> org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
>     at 
> org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
>     at 
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
>     at 
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
>     at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
>     at 
> org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:106)
>     at 
> org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:93)
>     at 
> org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:91)
>     at org.apache.spark.sql.Dataset.<init>(Dataset.scala:219)    at 
> org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99)    at 
> org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)    at 
> org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96)    at 
> org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:618)    
> at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)    at 
> org.apache.spark.sql.SparkSession.sql(SparkSession.scala:613)    at 
> org.apache.spark.sql.SQLContext.sql(SQLContext.scala:651)    at 
> org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:113)
>     at 
> org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:432)
>     at 
> org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1(SparkSQLCLIDriver.scala:574)
>     at 
> org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1$adapted(SparkSQLCLIDriver.scala:568)
>     at scala.collection.Iterator.foreach(Iterator.scala:943)    at 
> scala.collection.Iterator.foreach$(Iterator.scala:943)    at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1431)    at 
> scala.collection.IterableLike.foreach(IterableLike.scala:74)    at 
> scala.collection.IterableLike.foreach$(IterableLike.scala:73)    at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:56)    at 
> org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processLine(SparkSQLCLIDriver.scala:568)
>     at 
> org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:304)
>     at 
> org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)    at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
>    at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)    at 
> org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)  
>   at 
> org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:964)
>     at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180) 
>    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)    at 
> org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)    at 
> org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1055)  
>   at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1064)    at 
> org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala){code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to