Hussein Awala created HUDI-5310:
-----------------------------------
Summary: Corrupted timeline after 2 concurrent
INSERT_OVERWRITE_TABLE operations
Key: HUDI-5310
URL: https://issues.apache.org/jira/browse/HUDI-5310
Project: Apache Hudi
Issue Type: Bug
Reporter: Hussein Awala
We have non partitioned table which we overwrite weekly using a spark job
created from Airflow. After a task restart in Airflow, we had two concurrent
jobs which insert the same data, and they have been finished successfully, but
all the next jobs failed with the following exception:
{code:java}
2022-11-30 14:01:29,717 INFO yarn.ApplicationMaster: Final app status: FAILED,
exitCode: 15, (reason: User class threw exception:
org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for commit
time 20221130140117316
at
org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:64)
at
org.apache.hudi.table.action.commit.SparkInsertOverwriteCommitActionExecutor.execute(SparkInsertOverwriteCommitActionExecutor.java:63)
at
org.apache.hudi.table.HoodieSparkCopyOnWriteTable.insertOverwriteTable(HoodieSparkCopyOnWriteTable.java:164)
at
org.apache.hudi.table.HoodieSparkCopyOnWriteTable.insertOverwriteTable(HoodieSparkCopyOnWriteTable.java:97)
at
org.apache.hudi.client.SparkRDDWriteClient.insertOverwriteTable(SparkRDDWriteClient.java:226)
at
org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:210)
at
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:331)
at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:144)
at
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:110)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:110)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:106)
at
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
at
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:106)
at
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:93)
at
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:91)
at
org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:128)
at
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:848)
at
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:382)
at
org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:355)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
at
fr.leboncoin.data.lakehouse.datasource.HudiDatasource.operation(HudiDatasource.java:224)
at
fr.leboncoin.data.lakehouse.datasource.HudiDatasource.insertOverwriteTable(HudiDatasource.java:262)
at
fr.leboncoin.data.lakehouse.run.PrivacyClosedStores.run(PrivacyClosedStores.java:105)
at
fr.leboncoin.data.lakehouse.run.PrivacyClosedStores.main(PrivacyClosedStores.java:121)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:737)
Caused by: java.lang.IllegalStateException: Duplicate key
[20221120004252778__replacecommit__COMPLETED]
at
java.util.stream.Collectors.lambda$throwingMerger$0(Collectors.java:133)
at java.util.HashMap.merge(HashMap.java:1255)
at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1320)
at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
at
java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
at
java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
at
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
at
java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
at
java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at
java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
at
java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272)
at java.util.Iterator.forEachRemaining(Iterator.java:116)
at
java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
at
java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
at
java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at
java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
at
java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272)
at
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
at
java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at
java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
at
org.apache.hudi.common.table.view.AbstractTableFileSystemView.resetFileGroupsReplaced(AbstractTableFileSystemView.java:242)
at
org.apache.hudi.common.table.view.AbstractTableFileSystemView.init(AbstractTableFileSystemView.java:108)
at
org.apache.hudi.common.table.view.HoodieTableFileSystemView.init(HoodieTableFileSystemView.java:108)
at
org.apache.hudi.common.table.view.HoodieTableFileSystemView.<init>(HoodieTableFileSystemView.java:102)
at
org.apache.hudi.common.table.view.HoodieTableFileSystemView.<init>(HoodieTableFileSystemView.java:93)
at
org.apache.hudi.table.HoodieTable.getFileSystemView(HoodieTable.java:290)
at
org.apache.hudi.table.action.commit.UpsertPartitioner.getPartitionPathToPendingClusteringFileGroupsId(UpsertPartitioner.java:136)
at
org.apache.hudi.table.action.commit.UpsertPartitioner.assignInserts(UpsertPartitioner.java:169)
at
org.apache.hudi.table.action.commit.UpsertPartitioner.<init>(UpsertPartitioner.java:95)
at
org.apache.hudi.table.action.commit.SparkInsertOverwritePartitioner.<init>(SparkInsertOverwritePartitioner.java:41)
at
org.apache.hudi.table.action.commit.SparkInsertOverwriteCommitActionExecutor.getPartitioner(SparkInsertOverwriteCommitActionExecutor.java:70)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:161)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:85)
at
org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:57)
... 45 more
) {code}
After the debugging, we found a bug in [this
method|https://github.com/apache/hudi/blob/master/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java#L218]
which found the same {{partitionToReplaceFileIds}} in the two commits
Timeline:
{code:java}
20221120004340987.replacecommit -> 2
20221120004340987.replacecommit.inflight -> 2
20221120004252778.replacecommit -> 1
20221120004340987.replacecommit.requested -> 2
20221120004252778.replacecommit.inflight -> 1
20221120004252778.replacecommit.requested -> 1 {code}
Commit bodies:
{code:json}
20221120004252778.replacecommit:
{
"partitionToWriteStats" : {
"" : [ {
"fileId" : "1721d4dc-5806-4137-8bff-5af556339959-0",
"path" :
"1721d4dc-5806-4137-8bff-5af556339959-0_0-6-211_20221120004252778.parquet",
"prevCommit" : "null",
"numWrites" : 122984,
"numDeletes" : 0,
"numUpdateWrites" : 0,
"numInserts" : 122984,
"totalWriteBytes" : 4139555,
"totalWriteErrors" : 0,
"tempPath" : null,
"partitionPath" : "",
"totalLogRecords" : 0,
"totalLogFilesCompacted" : 0,
"totalLogSizeCompacted" : 0,
"totalUpdatedRecordsCompacted" : 0,
"totalLogBlocks" : 0,
"totalCorruptLogBlock" : 0,
"totalRollbackBlocks" : 0,
"fileSizeInBytes" : 4139555,
"minEventTime" : null,
"maxEventTime" : null
}, {
"fileId" : "fc9ca09d-e440-40a2-ac08-c7013bcf6d25-0",
"path" :
"fc9ca09d-e440-40a2-ac08-c7013bcf6d25-0_1-6-212_20221120004252778.parquet",
"prevCommit" : "null",
"numWrites" : 122488,
"numDeletes" : 0,
"numUpdateWrites" : 0,
"numInserts" : 122488,
"totalWriteBytes" : 4126038,
"totalWriteErrors" : 0,
"tempPath" : null,
"partitionPath" : "",
"totalLogRecords" : 0,
"totalLogFilesCompacted" : 0,
"totalLogSizeCompacted" : 0,
"totalUpdatedRecordsCompacted" : 0,
"totalLogBlocks" : 0,
"totalCorruptLogBlock" : 0,
"totalRollbackBlocks" : 0,
"fileSizeInBytes" : 4126038,
"minEventTime" : null,
"maxEventTime" : null
}, {
"fileId" : "e295911b-1100-4056-bf2a-7fc8fa778a9f-0",
"path" :
"e295911b-1100-4056-bf2a-7fc8fa778a9f-0_2-6-213_20221120004252778.parquet",
"prevCommit" : "null",
"numWrites" : 123101,
"numDeletes" : 0,
"numUpdateWrites" : 0,
"numInserts" : 123101,
"totalWriteBytes" : 4147197,
"totalWriteErrors" : 0,
"tempPath" : null,
"partitionPath" : "",
"totalLogRecords" : 0,
"totalLogFilesCompacted" : 0,
"totalLogSizeCompacted" : 0,
"totalUpdatedRecordsCompacted" : 0,
"totalLogBlocks" : 0,
"totalCorruptLogBlock" : 0,
"totalRollbackBlocks" : 0,
"fileSizeInBytes" : 4147197,
"minEventTime" : null,
"maxEventTime" : null
}, {
"fileId" : "8d2e26c1-8c92-49b7-9ba6-365b676d4e1a-0",
"path" :
"8d2e26c1-8c92-49b7-9ba6-365b676d4e1a-0_3-6-214_20221120004252778.parquet",
"prevCommit" : "null",
"numWrites" : 123123,
"numDeletes" : 0,
"numUpdateWrites" : 0,
"numInserts" : 123123,
"totalWriteBytes" : 4153959,
"totalWriteErrors" : 0,
"tempPath" : null,
"partitionPath" : "",
"totalLogRecords" : 0,
"totalLogFilesCompacted" : 0,
"totalLogSizeCompacted" : 0,
"totalUpdatedRecordsCompacted" : 0,
"totalLogBlocks" : 0,
"totalCorruptLogBlock" : 0,
"totalRollbackBlocks" : 0,
"fileSizeInBytes" : 4153959,
"minEventTime" : null,
"maxEventTime" : null
}, {
"fileId" : "59ec3de4-73de-4025-83e0-f39eee9a1fc6-0",
"path" :
"59ec3de4-73de-4025-83e0-f39eee9a1fc6-0_4-6-215_20221120004252778.parquet",
"prevCommit" : "null",
"numWrites" : 115347,
"numDeletes" : 0,
"numUpdateWrites" : 0,
"numInserts" : 115347,
"totalWriteBytes" : 3946550,
"totalWriteErrors" : 0,
"tempPath" : null,
"partitionPath" : "",
"totalLogRecords" : 0,
"totalLogFilesCompacted" : 0,
"totalLogSizeCompacted" : 0,
"totalUpdatedRecordsCompacted" : 0,
"totalLogBlocks" : 0,
"totalCorruptLogBlock" : 0,
"totalRollbackBlocks" : 0,
"fileSizeInBytes" : 3946550,
"minEventTime" : null,
"maxEventTime" : null
} ]
},
"compacted" : false,
"extraMetadata" : {
"schema" : "{...},
"operationType" : "INSERT_OVERWRITE_TABLE",
"partitionToReplaceFileIds" : {
"" : [ "93081ea3-6e42-4a9f-b34c-f643098e5194-0",
"440b39d3-6d3c-4f59-b753-fc81d56c5a1f-0",
"3a309f89-40ba-4e12-a389-a26677f35fd2-0",
"32f2b395-a9f5-450f-8086-45996a439678-0",
"25dc73d9-3e05-40ac-b45a-d7b87c05a678-0" ]
}
} {code}
{code:json}
20221120004340987.replacecommit:
{
"partitionToWriteStats" : {
"" : [ {
"fileId" : "a479538d-4423-4594-97ec-28f21ffa2030-0",
"path" :
"a479538d-4423-4594-97ec-28f21ffa2030-0_0-6-211_20221120004340987.parquet",
"prevCommit" : "null",
"numWrites" : 122984,
"numDeletes" : 0,
"numUpdateWrites" : 0,
"numInserts" : 122984,
"totalWriteBytes" : 4139440,
"totalWriteErrors" : 0,
"tempPath" : null,
"partitionPath" : "",
"totalLogRecords" : 0,
"totalLogFilesCompacted" : 0,
"totalLogSizeCompacted" : 0,
"totalUpdatedRecordsCompacted" : 0,
"totalLogBlocks" : 0,
"totalCorruptLogBlock" : 0,
"totalRollbackBlocks" : 0,
"fileSizeInBytes" : 4139440,
"minEventTime" : null,
"maxEventTime" : null
}, {
"fileId" : "58f19fb8-9f05-4160-8f80-36123937081d-0",
"path" :
"58f19fb8-9f05-4160-8f80-36123937081d-0_1-6-212_20221120004340987.parquet",
"prevCommit" : "null",
"numWrites" : 122488,
"numDeletes" : 0,
"numUpdateWrites" : 0,
"numInserts" : 122488,
"totalWriteBytes" : 4141374,
"totalWriteErrors" : 0,
"tempPath" : null,
"partitionPath" : "",
"totalLogRecords" : 0,
"totalLogFilesCompacted" : 0,
"totalLogSizeCompacted" : 0,
"totalUpdatedRecordsCompacted" : 0,
"totalLogBlocks" : 0,
"totalCorruptLogBlock" : 0,
"totalRollbackBlocks" : 0,
"fileSizeInBytes" : 4141374,
"minEventTime" : null,
"maxEventTime" : null
}, {
"fileId" : "44a0539f-4e97-44e1-b1f1-c0c809fe4c64-0",
"path" :
"44a0539f-4e97-44e1-b1f1-c0c809fe4c64-0_2-6-213_20221120004340987.parquet",
"prevCommit" : "null",
"numWrites" : 123101,
"numDeletes" : 0,
"numUpdateWrites" : 0,
"numInserts" : 123101,
"totalWriteBytes" : 4149562,
"totalWriteErrors" : 0,
"tempPath" : null,
"partitionPath" : "",
"totalLogRecords" : 0,
"totalLogFilesCompacted" : 0,
"totalLogSizeCompacted" : 0,
"totalUpdatedRecordsCompacted" : 0,
"totalLogBlocks" : 0,
"totalCorruptLogBlock" : 0,
"totalRollbackBlocks" : 0,
"fileSizeInBytes" : 4149562,
"minEventTime" : null,
"maxEventTime" : null
}, {
"fileId" : "eb81d6d3-898e-41a3-8c93-33cc1b3bd4d9-0",
"path" :
"eb81d6d3-898e-41a3-8c93-33cc1b3bd4d9-0_3-6-214_20221120004340987.parquet",
"prevCommit" : "null",
"numWrites" : 123123,
"numDeletes" : 0,
"numUpdateWrites" : 0,
"numInserts" : 123123,
"totalWriteBytes" : 4152918,
"totalWriteErrors" : 0,
"tempPath" : null,
"partitionPath" : "",
"totalLogRecords" : 0,
"totalLogFilesCompacted" : 0,
"totalLogSizeCompacted" : 0,
"totalUpdatedRecordsCompacted" : 0,
"totalLogBlocks" : 0,
"totalCorruptLogBlock" : 0,
"totalRollbackBlocks" : 0,
"fileSizeInBytes" : 4152918,
"minEventTime" : null,
"maxEventTime" : null
}, {
"fileId" : "8ad225d8-286c-42ae-b9bd-359e39f57e3d-0",
"path" :
"8ad225d8-286c-42ae-b9bd-359e39f57e3d-0_4-6-215_20221120004340987.parquet",
"prevCommit" : "null",
"numWrites" : 115347,
"numDeletes" : 0,
"numUpdateWrites" : 0,
"numInserts" : 115347,
"totalWriteBytes" : 3940020,
"totalWriteErrors" : 0,
"tempPath" : null,
"partitionPath" : "",
"totalLogRecords" : 0,
"totalLogFilesCompacted" : 0,
"totalLogSizeCompacted" : 0,
"totalUpdatedRecordsCompacted" : 0,
"totalLogBlocks" : 0,
"totalCorruptLogBlock" : 0,
"totalRollbackBlocks" : 0,
"fileSizeInBytes" : 3940020,
"minEventTime" : null,
"maxEventTime" : null
} ]
},
"compacted" : false,
"extraMetadata" : {
"schema" : "{...},
"operationType" : "INSERT_OVERWRITE_TABLE",
"partitionToReplaceFileIds" : {
"" : [ "93081ea3-6e42-4a9f-b34c-f643098e5194-0",
"440b39d3-6d3c-4f59-b753-fc81d56c5a1f-0",
"3a309f89-40ba-4e12-a389-a26677f35fd2-0",
"32f2b395-a9f5-450f-8086-45996a439678-0",
"25dc73d9-3e05-40ac-b45a-d7b87c05a678-0" ]
}
}{code}
After manually deleting the file {{{}20221120004252778.replacecommit{}}}, the
problem was solved and the next job passed successfully.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)