Hussein Awala created HUDI-5310:
-----------------------------------

             Summary: Corrupted timeline after 2 concurrent 
INSERT_OVERWRITE_TABLE operations
                 Key: HUDI-5310
                 URL: https://issues.apache.org/jira/browse/HUDI-5310
             Project: Apache Hudi
          Issue Type: Bug
            Reporter: Hussein Awala


We have non partitioned table which we overwrite weekly using a spark job 
created from Airflow. After a task restart in Airflow, we had two concurrent 
jobs which insert the same data, and they have been finished successfully, but 
all the next jobs failed with the following exception:
{code:java}
2022-11-30 14:01:29,717 INFO yarn.ApplicationMaster: Final app status: FAILED, 
exitCode: 15, (reason: User class threw exception: 
org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for commit 
time 20221130140117316
        at 
org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:64)
        at 
org.apache.hudi.table.action.commit.SparkInsertOverwriteCommitActionExecutor.execute(SparkInsertOverwriteCommitActionExecutor.java:63)
        at 
org.apache.hudi.table.HoodieSparkCopyOnWriteTable.insertOverwriteTable(HoodieSparkCopyOnWriteTable.java:164)
        at 
org.apache.hudi.table.HoodieSparkCopyOnWriteTable.insertOverwriteTable(HoodieSparkCopyOnWriteTable.java:97)
        at 
org.apache.hudi.client.SparkRDDWriteClient.insertOverwriteTable(SparkRDDWriteClient.java:226)
        at 
org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:210)
        at 
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:331)
        at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:144)
        at 
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:110)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:110)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:106)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
        at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
        at 
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:106)
        at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:93)
        at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:91)
        at 
org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:128)
        at 
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:848)
        at 
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:382)
        at 
org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:355)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
        at 
fr.leboncoin.data.lakehouse.datasource.HudiDatasource.operation(HudiDatasource.java:224)
        at 
fr.leboncoin.data.lakehouse.datasource.HudiDatasource.insertOverwriteTable(HudiDatasource.java:262)
        at 
fr.leboncoin.data.lakehouse.run.PrivacyClosedStores.run(PrivacyClosedStores.java:105)
        at 
fr.leboncoin.data.lakehouse.run.PrivacyClosedStores.main(PrivacyClosedStores.java:121)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:737)
Caused by: java.lang.IllegalStateException: Duplicate key 
[20221120004252778__replacecommit__COMPLETED]
        at 
java.util.stream.Collectors.lambda$throwingMerger$0(Collectors.java:133)
        at java.util.HashMap.merge(HashMap.java:1255)
        at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1320)
        at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
        at 
java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
        at 
java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
        at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
        at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
        at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
        at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
        at 
java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
        at 
java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
        at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at 
java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
        at 
java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272)
        at java.util.Iterator.forEachRemaining(Iterator.java:116)
        at 
java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
        at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
        at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
        at 
java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
        at 
java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
        at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at 
java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
        at 
java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272)
        at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
        at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
        at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
        at 
java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
        at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at 
java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
        at 
org.apache.hudi.common.table.view.AbstractTableFileSystemView.resetFileGroupsReplaced(AbstractTableFileSystemView.java:242)
        at 
org.apache.hudi.common.table.view.AbstractTableFileSystemView.init(AbstractTableFileSystemView.java:108)
        at 
org.apache.hudi.common.table.view.HoodieTableFileSystemView.init(HoodieTableFileSystemView.java:108)
        at 
org.apache.hudi.common.table.view.HoodieTableFileSystemView.<init>(HoodieTableFileSystemView.java:102)
        at 
org.apache.hudi.common.table.view.HoodieTableFileSystemView.<init>(HoodieTableFileSystemView.java:93)
        at 
org.apache.hudi.table.HoodieTable.getFileSystemView(HoodieTable.java:290)
        at 
org.apache.hudi.table.action.commit.UpsertPartitioner.getPartitionPathToPendingClusteringFileGroupsId(UpsertPartitioner.java:136)
        at 
org.apache.hudi.table.action.commit.UpsertPartitioner.assignInserts(UpsertPartitioner.java:169)
        at 
org.apache.hudi.table.action.commit.UpsertPartitioner.<init>(UpsertPartitioner.java:95)
        at 
org.apache.hudi.table.action.commit.SparkInsertOverwritePartitioner.<init>(SparkInsertOverwritePartitioner.java:41)
        at 
org.apache.hudi.table.action.commit.SparkInsertOverwriteCommitActionExecutor.getPartitioner(SparkInsertOverwriteCommitActionExecutor.java:70)
        at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:161)
        at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:85)
        at 
org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:57)
        ... 45 more
) {code}
After the debugging, we found a bug in [this 
method|https://github.com/apache/hudi/blob/master/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java#L218]
 which found the same {{partitionToReplaceFileIds}} in the two commits


Timeline:
{code:java}
20221120004340987.replacecommit -> 2

20221120004340987.replacecommit.inflight -> 2

20221120004252778.replacecommit -> 1

20221120004340987.replacecommit.requested -> 2

20221120004252778.replacecommit.inflight -> 1

20221120004252778.replacecommit.requested -> 1 {code}
Commit bodies:
{code:json}
20221120004252778.replacecommit:
{
  "partitionToWriteStats" : {
    "" : [ {
      "fileId" : "1721d4dc-5806-4137-8bff-5af556339959-0",
      "path" : 
"1721d4dc-5806-4137-8bff-5af556339959-0_0-6-211_20221120004252778.parquet",
      "prevCommit" : "null",
      "numWrites" : 122984,
      "numDeletes" : 0,
      "numUpdateWrites" : 0,
      "numInserts" : 122984,
      "totalWriteBytes" : 4139555,
      "totalWriteErrors" : 0,
      "tempPath" : null,
      "partitionPath" : "",
      "totalLogRecords" : 0,
      "totalLogFilesCompacted" : 0,
      "totalLogSizeCompacted" : 0,
      "totalUpdatedRecordsCompacted" : 0,
      "totalLogBlocks" : 0,
      "totalCorruptLogBlock" : 0,
      "totalRollbackBlocks" : 0,
      "fileSizeInBytes" : 4139555,
      "minEventTime" : null,
      "maxEventTime" : null
    }, {
      "fileId" : "fc9ca09d-e440-40a2-ac08-c7013bcf6d25-0",
      "path" : 
"fc9ca09d-e440-40a2-ac08-c7013bcf6d25-0_1-6-212_20221120004252778.parquet",
      "prevCommit" : "null",
      "numWrites" : 122488,
      "numDeletes" : 0,
      "numUpdateWrites" : 0,
      "numInserts" : 122488,
      "totalWriteBytes" : 4126038,
      "totalWriteErrors" : 0,
      "tempPath" : null,
      "partitionPath" : "",
      "totalLogRecords" : 0,
      "totalLogFilesCompacted" : 0,
      "totalLogSizeCompacted" : 0,
      "totalUpdatedRecordsCompacted" : 0,
      "totalLogBlocks" : 0,
      "totalCorruptLogBlock" : 0,
      "totalRollbackBlocks" : 0,
      "fileSizeInBytes" : 4126038,
      "minEventTime" : null,
      "maxEventTime" : null
    }, {
      "fileId" : "e295911b-1100-4056-bf2a-7fc8fa778a9f-0",
      "path" : 
"e295911b-1100-4056-bf2a-7fc8fa778a9f-0_2-6-213_20221120004252778.parquet",
      "prevCommit" : "null",
      "numWrites" : 123101,
      "numDeletes" : 0,
      "numUpdateWrites" : 0,
      "numInserts" : 123101,
      "totalWriteBytes" : 4147197,
      "totalWriteErrors" : 0,
      "tempPath" : null,
      "partitionPath" : "",
      "totalLogRecords" : 0,
      "totalLogFilesCompacted" : 0,
      "totalLogSizeCompacted" : 0,
      "totalUpdatedRecordsCompacted" : 0,
      "totalLogBlocks" : 0,
      "totalCorruptLogBlock" : 0,
      "totalRollbackBlocks" : 0,
      "fileSizeInBytes" : 4147197,
      "minEventTime" : null,
      "maxEventTime" : null
    }, {
      "fileId" : "8d2e26c1-8c92-49b7-9ba6-365b676d4e1a-0",
      "path" : 
"8d2e26c1-8c92-49b7-9ba6-365b676d4e1a-0_3-6-214_20221120004252778.parquet",
      "prevCommit" : "null",
      "numWrites" : 123123,
      "numDeletes" : 0,
      "numUpdateWrites" : 0,
      "numInserts" : 123123,
      "totalWriteBytes" : 4153959,
      "totalWriteErrors" : 0,
      "tempPath" : null,
      "partitionPath" : "",
      "totalLogRecords" : 0,
      "totalLogFilesCompacted" : 0,
      "totalLogSizeCompacted" : 0,
      "totalUpdatedRecordsCompacted" : 0,
      "totalLogBlocks" : 0,
      "totalCorruptLogBlock" : 0,
      "totalRollbackBlocks" : 0,
      "fileSizeInBytes" : 4153959,
      "minEventTime" : null,
      "maxEventTime" : null
    }, {
      "fileId" : "59ec3de4-73de-4025-83e0-f39eee9a1fc6-0",
      "path" : 
"59ec3de4-73de-4025-83e0-f39eee9a1fc6-0_4-6-215_20221120004252778.parquet",
      "prevCommit" : "null",
      "numWrites" : 115347,
      "numDeletes" : 0,
      "numUpdateWrites" : 0,
      "numInserts" : 115347,
      "totalWriteBytes" : 3946550,
      "totalWriteErrors" : 0,
      "tempPath" : null,
      "partitionPath" : "",
      "totalLogRecords" : 0,
      "totalLogFilesCompacted" : 0,
      "totalLogSizeCompacted" : 0,
      "totalUpdatedRecordsCompacted" : 0,
      "totalLogBlocks" : 0,
      "totalCorruptLogBlock" : 0,
      "totalRollbackBlocks" : 0,
      "fileSizeInBytes" : 3946550,
      "minEventTime" : null,
      "maxEventTime" : null
    } ]
  },
  "compacted" : false,
  "extraMetadata" : {
    "schema" : "{...},
  "operationType" : "INSERT_OVERWRITE_TABLE",
  "partitionToReplaceFileIds" : {
    "" : [ "93081ea3-6e42-4a9f-b34c-f643098e5194-0", 
"440b39d3-6d3c-4f59-b753-fc81d56c5a1f-0", 
"3a309f89-40ba-4e12-a389-a26677f35fd2-0", 
"32f2b395-a9f5-450f-8086-45996a439678-0", 
"25dc73d9-3e05-40ac-b45a-d7b87c05a678-0" ]
  }
} {code}
{code:json}
20221120004340987.replacecommit:
{
  "partitionToWriteStats" : {
    "" : [ {
      "fileId" : "a479538d-4423-4594-97ec-28f21ffa2030-0",
      "path" : 
"a479538d-4423-4594-97ec-28f21ffa2030-0_0-6-211_20221120004340987.parquet",
      "prevCommit" : "null",
      "numWrites" : 122984,
      "numDeletes" : 0,
      "numUpdateWrites" : 0,
      "numInserts" : 122984,
      "totalWriteBytes" : 4139440,
      "totalWriteErrors" : 0,
      "tempPath" : null,
      "partitionPath" : "",
      "totalLogRecords" : 0,
      "totalLogFilesCompacted" : 0,
      "totalLogSizeCompacted" : 0,
      "totalUpdatedRecordsCompacted" : 0,
      "totalLogBlocks" : 0,
      "totalCorruptLogBlock" : 0,
      "totalRollbackBlocks" : 0,
      "fileSizeInBytes" : 4139440,
      "minEventTime" : null,
      "maxEventTime" : null
    }, {
      "fileId" : "58f19fb8-9f05-4160-8f80-36123937081d-0",
      "path" : 
"58f19fb8-9f05-4160-8f80-36123937081d-0_1-6-212_20221120004340987.parquet",
      "prevCommit" : "null",
      "numWrites" : 122488,
      "numDeletes" : 0,
      "numUpdateWrites" : 0,
      "numInserts" : 122488,
      "totalWriteBytes" : 4141374,
      "totalWriteErrors" : 0,
      "tempPath" : null,
      "partitionPath" : "",
      "totalLogRecords" : 0,
      "totalLogFilesCompacted" : 0,
      "totalLogSizeCompacted" : 0,
      "totalUpdatedRecordsCompacted" : 0,
      "totalLogBlocks" : 0,
      "totalCorruptLogBlock" : 0,
      "totalRollbackBlocks" : 0,
      "fileSizeInBytes" : 4141374,
      "minEventTime" : null,
      "maxEventTime" : null
    }, {
      "fileId" : "44a0539f-4e97-44e1-b1f1-c0c809fe4c64-0",
      "path" : 
"44a0539f-4e97-44e1-b1f1-c0c809fe4c64-0_2-6-213_20221120004340987.parquet",
      "prevCommit" : "null",
      "numWrites" : 123101,
      "numDeletes" : 0,
      "numUpdateWrites" : 0,
      "numInserts" : 123101,
      "totalWriteBytes" : 4149562,
      "totalWriteErrors" : 0,
      "tempPath" : null,
      "partitionPath" : "",
      "totalLogRecords" : 0,
      "totalLogFilesCompacted" : 0,
      "totalLogSizeCompacted" : 0,
      "totalUpdatedRecordsCompacted" : 0,
      "totalLogBlocks" : 0,
      "totalCorruptLogBlock" : 0,
      "totalRollbackBlocks" : 0,
      "fileSizeInBytes" : 4149562,
      "minEventTime" : null,
      "maxEventTime" : null
    }, {
      "fileId" : "eb81d6d3-898e-41a3-8c93-33cc1b3bd4d9-0",
      "path" : 
"eb81d6d3-898e-41a3-8c93-33cc1b3bd4d9-0_3-6-214_20221120004340987.parquet",
      "prevCommit" : "null",
      "numWrites" : 123123,
      "numDeletes" : 0,
      "numUpdateWrites" : 0,
      "numInserts" : 123123,
      "totalWriteBytes" : 4152918,
      "totalWriteErrors" : 0,
      "tempPath" : null,
      "partitionPath" : "",
      "totalLogRecords" : 0,
      "totalLogFilesCompacted" : 0,
      "totalLogSizeCompacted" : 0,
      "totalUpdatedRecordsCompacted" : 0,
      "totalLogBlocks" : 0,
      "totalCorruptLogBlock" : 0,
      "totalRollbackBlocks" : 0,
      "fileSizeInBytes" : 4152918,
      "minEventTime" : null,
      "maxEventTime" : null
    }, {
      "fileId" : "8ad225d8-286c-42ae-b9bd-359e39f57e3d-0",
      "path" : 
"8ad225d8-286c-42ae-b9bd-359e39f57e3d-0_4-6-215_20221120004340987.parquet",
      "prevCommit" : "null",
      "numWrites" : 115347,
      "numDeletes" : 0,
      "numUpdateWrites" : 0,
      "numInserts" : 115347,
      "totalWriteBytes" : 3940020,
      "totalWriteErrors" : 0,
      "tempPath" : null,
      "partitionPath" : "",
      "totalLogRecords" : 0,
      "totalLogFilesCompacted" : 0,
      "totalLogSizeCompacted" : 0,
      "totalUpdatedRecordsCompacted" : 0,
      "totalLogBlocks" : 0,
      "totalCorruptLogBlock" : 0,
      "totalRollbackBlocks" : 0,
      "fileSizeInBytes" : 3940020,
      "minEventTime" : null,
      "maxEventTime" : null
    } ]
  },
  "compacted" : false,
  "extraMetadata" : {
    "schema" : "{...},
  "operationType" : "INSERT_OVERWRITE_TABLE",
  "partitionToReplaceFileIds" : {
    "" : [ "93081ea3-6e42-4a9f-b34c-f643098e5194-0", 
"440b39d3-6d3c-4f59-b753-fc81d56c5a1f-0", 
"3a309f89-40ba-4e12-a389-a26677f35fd2-0", 
"32f2b395-a9f5-450f-8086-45996a439678-0", 
"25dc73d9-3e05-40ac-b45a-d7b87c05a678-0" ]
  }
}{code}
After manually deleting the file {{{}20221120004252778.replacecommit{}}}, the 
problem was solved and the next job passed successfully.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to