[ https://issues.apache.org/jira/browse/HUDI-2641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Udit Mehrotra updated HUDI-2641: -------------------------------- Description: The bug is easily reproducible when running 4-5 concurrent writers at once. Atleast a few writers would end up failing with the following stack trace in Hudi 0.8.0: {noformat} py4j.protocol.Py4JJavaError: An error occurred while calling o122.save. : java.lang.IllegalArgumentException at org.apache.hudi.common.util.ValidationUtils.checkArgument(ValidationUtils.java:31) at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:396) at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:377) at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.saveAsComplete(HoodieActiveTimeline.java:154) at org.apache.hudi.client.AbstractHoodieWriteClient.commit(AbstractHoodieWriteClient.java:212) at org.apache.hudi.client.AbstractHoodieWriteClient.commitStats(AbstractHoodieWriteClient.java:185) at org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:121) at org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:479) at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:223) at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:145) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68) at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:194) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:232) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:229) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:190) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133) at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232) at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438) at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748){noformat} Based on my debugging, this is what is going on: * In our start commit/clean logic, we have a step where Hoodie rolls back any failed writes [here|https://github.com/apache/hudi/blob/release-0.8.0/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java#L669]. As part of the logic, it needs to identify instants that need to be rolled back. In multi writer case, it depends upon the heart beats (whether expired on not) to make a determination whether an inflight commit is a failed write or not and can be rolled back [here|https://github.com/apache/hudi/blob/release-0.8.0/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java#L828]. * And as part of the rolling back failed writes logic it makes this call to delete heartbeats of rolled back/expired commits [here|https://github.com/apache/hudi/blob/release-0.8.0/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java#L813]. *This call is being made irrespective of whether or not there is any instant to rollback and the logic in that method is glaringly wrong. It just goes and deletes all the heartbeats of all the inflight commits.* * So one commit while starting the commit process goes ahead and deletes all the heartbeats of other inflight commits. Meanwhile another commit has reached the _cleaner_ stage where it again checks if there are any commits to rollback. Now, because of missing/deleted heartbeats of some of the other commits it assumes them to be expired/failed writes and starts rolling them back and deletes their INFLIGHT commit file on the timeline. * Ultimately, the inflight commits that are incorrectly rolled back end up failing with the above exception because INFLIGHT state of that commit no longer exists in the timeline, for it to be able to complete the commit process successfully. was: The bug is easily reproducible when running 4-5 concurrent writers at once. Atleast a few writers would end up failing with the following stack trace in Hudi 0.8.0: {noformat} py4j.protocol.Py4JJavaError: An error occurred while calling o122.save. : java.lang.IllegalArgumentException at org.apache.hudi.common.util.ValidationUtils.checkArgument(ValidationUtils.java:31) at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:396) at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:377) at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.saveAsComplete(HoodieActiveTimeline.java:154) at org.apache.hudi.client.AbstractHoodieWriteClient.commit(AbstractHoodieWriteClient.java:212) at org.apache.hudi.client.AbstractHoodieWriteClient.commitStats(AbstractHoodieWriteClient.java:185) at org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:121) at org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:479) at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:223) at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:145) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68) at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:194) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:232) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:229) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:190) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133) at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232) at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438) at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748){noformat} Based on my debugging, this is what is going on: * In our start commit/clean logic, we have a step where Hoodie rolls back any failed writes [here|https://github.com/apache/hudi/blob/release-0.8.0/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java#L669]. As part of the logic, it needs to identify instants that need to be rolled back. In multi writer case, it depends upon the heart beats (whether expired on not) to make a determination whether an inflight commit is a failed write or not and can be rolled back [here|https://github.com/apache/hudi/blob/release-0.8.0/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java#L828]. > One inflight commit rolling back other concurrent inflight commits causing > them to fail > --------------------------------------------------------------------------------------- > > Key: HUDI-2641 > URL: https://issues.apache.org/jira/browse/HUDI-2641 > Project: Apache Hudi > Issue Type: Bug > Components: Writer Core > Reporter: Udit Mehrotra > Assignee: Udit Mehrotra > Priority: Critical > Labels: release-blocker > Fix For: 0.10.0 > > > The bug is easily reproducible when running 4-5 concurrent writers at once. > Atleast a few writers would end up failing with the following stack trace in > Hudi 0.8.0: > {noformat} > py4j.protocol.Py4JJavaError: An error occurred while calling o122.save. > : java.lang.IllegalArgumentException > at > org.apache.hudi.common.util.ValidationUtils.checkArgument(ValidationUtils.java:31) > at > org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:396) > at > org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:377) > at > org.apache.hudi.common.table.timeline.HoodieActiveTimeline.saveAsComplete(HoodieActiveTimeline.java:154) > at > org.apache.hudi.client.AbstractHoodieWriteClient.commit(AbstractHoodieWriteClient.java:212) > at > org.apache.hudi.client.AbstractHoodieWriteClient.commitStats(AbstractHoodieWriteClient.java:185) > at > org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:121) > at > org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:479) > at > org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:223) > at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:145) > at > org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46) > at > org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70) > at > org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68) > at > org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90) > at > org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:194) > at > org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:232) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:229) > at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:190) > at > org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134) > at > org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133) > at > org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989) > at > org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) > at > org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232) > at > org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110) > at > org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135) > at > org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) > at > org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232) > at > org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135) > at > org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253) > at > org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134) > at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68) > at > org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989) > at > org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438) > at > org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415) > at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > at py4j.Gateway.invoke(Gateway.java:282) > at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > at py4j.commands.CallCommand.execute(CallCommand.java:79) > at py4j.GatewayConnection.run(GatewayConnection.java:238) > at java.lang.Thread.run(Thread.java:748){noformat} > > Based on my debugging, this is what is going on: > * In our start commit/clean logic, we have a step where Hoodie rolls back > any failed writes > [here|https://github.com/apache/hudi/blob/release-0.8.0/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java#L669]. > As part of the logic, it needs to identify instants that need to be rolled > back. In multi writer case, it depends upon the heart beats (whether expired > on not) to make a determination whether an inflight commit is a failed write > or not and can be rolled back > [here|https://github.com/apache/hudi/blob/release-0.8.0/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java#L828]. > * And as part of the rolling back failed writes logic it makes this call to > delete heartbeats of rolled back/expired commits > [here|https://github.com/apache/hudi/blob/release-0.8.0/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java#L813]. > *This call is being made irrespective of whether or not there is any instant > to rollback and the logic in that method is glaringly wrong. It just goes and > deletes all the heartbeats of all the inflight commits.* > * So one commit while starting the commit process goes ahead and deletes all > the heartbeats of other inflight commits. Meanwhile another commit has > reached the _cleaner_ stage where it again checks if there are any commits to > rollback. Now, because of missing/deleted heartbeats of some of the other > commits it assumes them to be expired/failed writes and starts rolling them > back and deletes their INFLIGHT commit file on the timeline. > * Ultimately, the inflight commits that are incorrectly rolled back end up > failing with the above exception because INFLIGHT state of that commit no > longer exists in the timeline, for it to be able to complete the commit > process successfully. -- This message was sent by Atlassian Jira (v8.3.4#803005)