[ 
https://issues.apache.org/jira/browse/HUDI-2549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17428499#comment-17428499
 ] 

Dave Hagman commented on HUDI-2549:
-----------------------------------

While continuing to test, I found that the _*FileAlreadyExistsException*_ can 
occur on both the deltastreamer and secondary writers (spark datasource writers 
in my tests). On my latest run the spark datasource writer created a commit 
"ahead" of the deltastreamer. This resulted in the deltastreamer failing with 
the same error as before:
Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: File already 
exists:s3://....
This also caused a more insidious issue: The deltastreamer checkpoint state is 
now missing from recent commits and therefore it is unable to start. 

[~shivnarayan] [~vinoth] Can you confirm that you are able to reproduce this 
issue? I remember seeing that you have run this exact configuration without 
issue before. If that is the case then I am quite confused why it would not 
work for me on a brand new table. 

> Exceptions when using second writer into Hudi table managed by DeltaStreamer
> ----------------------------------------------------------------------------
>
>                 Key: HUDI-2549
>                 URL: https://issues.apache.org/jira/browse/HUDI-2549
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: DeltaStreamer, Spark Integration, Writer Core
>            Reporter: Dave Hagman
>            Assignee: Dave Hagman
>            Priority: Critical
>              Labels: multi-writer, sev:critical
>             Fix For: 0.10.0
>
>
> When running the DeltaStreamer along with a second spark datasource writer 
> (with [ZK-based OCC 
> enabled|https://hudi.apache.org/docs/concurrency_control#enabling-multi-writing]
>  we receive the following exception (which haults the spark datasource 
> writer). This occurs following warnings of timeline inconsistencies:
>  
> {code:java}
> 21/10/07 17:10:05 INFO TransactionManager: Transaction ending with 
> transaction owner Option{val=[==>20211007170717__commit__INFLIGHT]}
> 21/10/07 17:10:05 INFO ZookeeperBasedLockProvider: RELEASING lock 
> atZkBasePath = /events/test/mwc/v1, lock key = events_mwc_test_v1
> 21/10/07 17:10:05 INFO ZookeeperBasedLockProvider: RELEASED lock atZkBasePath 
> = /events/test/mwc/v1, lock key = events_mwc_test_v1
> 21/10/07 17:10:05 INFO TransactionManager: Transaction ended
> Exception in thread "main" java.lang.IllegalArgumentException
>         at 
> org.apache.hudi.common.util.ValidationUtils.checkArgument(ValidationUtils.java:31)
>         at 
> org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:414)
>         at 
> org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:395)
>         at 
> org.apache.hudi.common.table.timeline.HoodieActiveTimeline.saveAsComplete(HoodieActiveTimeline.java:153)
>         at 
> org.apache.hudi.client.AbstractHoodieWriteClient.commit(AbstractHoodieWriteClient.java:218)
>         at 
> org.apache.hudi.client.AbstractHoodieWriteClient.commitStats(AbstractHoodieWriteClient.java:190)
>         at 
> org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:124)
>         at 
> org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:617)
>         at 
> org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:274)
>         at 
> org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:164)
>         at 
> org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
>         at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
>         at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
>         at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
>         at 
> org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:185)
>         at 
> org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223)
>         at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>         at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220)
>         at 
> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:181)
>         at 
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134)
>         at 
> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133)
>         at 
> org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
>         at 
> org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
>         at 
> org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
>         at 
> org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
>         at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
>         at 
> org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
>         at 
> org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
>         at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
>         at 
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
>         at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
>         at 
> org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
>         at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
>         at 
> org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
>         at 
> org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
>         at 
> org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
>         at 
> org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)
> {code}
> The validation at _*ValidationUtils.checkArgument*_ fails because the 
> expected commit file was not present on DFS.
>  
>  The test setup is this:
>  * Deltastreamer, continuous mode. Small batch sizes and very fast commit 
> times (high commit rate, every 10-30 seconds)
>  * A spark datasource writer moving flat parquet files from a source bucket 
> into the table maintained by the deltastreamer
>  * The spark datasource is much slower than the deltastreamer so 
> time-to-first-commit is about 2-8 minutes
> What I see happen is the deltastreamer finalizing many commits while the 
> spark datasource is performing its write. It appears that the timeline 
> changes so much, so fast that the spark datasource writer becomes "out of 
> sync" in ways that it cannot recover. I see the {{Exception in thread "main" 
> java.lang.IllegalArgumentException}} error on the first commit of the 
> *datasource writer* every time. This appears to be a race condition when the 
> rate-of-change of the hudi timeline is very high (due to a fast deltastreamer 
> process). The spark datasource does not properly sync those changes in a 
> multi-writer configuration which causes enough inconsistency to crash the 
> job. 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to