[ https://issues.apache.org/jira/browse/HUDI-2549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17428499#comment-17428499 ]
Dave Hagman commented on HUDI-2549: ----------------------------------- While continuing to test, I found that the _*FileAlreadyExistsException*_ can occur on both the deltastreamer and secondary writers (spark datasource writers in my tests). On my latest run the spark datasource writer created a commit "ahead" of the deltastreamer. This resulted in the deltastreamer failing with the same error as before: Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: File already exists:s3://.... This also caused a more insidious issue: The deltastreamer checkpoint state is now missing from recent commits and therefore it is unable to start. [~shivnarayan] [~vinoth] Can you confirm that you are able to reproduce this issue? I remember seeing that you have run this exact configuration without issue before. If that is the case then I am quite confused why it would not work for me on a brand new table. > Exceptions when using second writer into Hudi table managed by DeltaStreamer > ---------------------------------------------------------------------------- > > Key: HUDI-2549 > URL: https://issues.apache.org/jira/browse/HUDI-2549 > Project: Apache Hudi > Issue Type: Bug > Components: DeltaStreamer, Spark Integration, Writer Core > Reporter: Dave Hagman > Assignee: Dave Hagman > Priority: Critical > Labels: multi-writer, sev:critical > Fix For: 0.10.0 > > > When running the DeltaStreamer along with a second spark datasource writer > (with [ZK-based OCC > enabled|https://hudi.apache.org/docs/concurrency_control#enabling-multi-writing] > we receive the following exception (which haults the spark datasource > writer). This occurs following warnings of timeline inconsistencies: > > {code:java} > 21/10/07 17:10:05 INFO TransactionManager: Transaction ending with > transaction owner Option{val=[==>20211007170717__commit__INFLIGHT]} > 21/10/07 17:10:05 INFO ZookeeperBasedLockProvider: RELEASING lock > atZkBasePath = /events/test/mwc/v1, lock key = events_mwc_test_v1 > 21/10/07 17:10:05 INFO ZookeeperBasedLockProvider: RELEASED lock atZkBasePath > = /events/test/mwc/v1, lock key = events_mwc_test_v1 > 21/10/07 17:10:05 INFO TransactionManager: Transaction ended > Exception in thread "main" java.lang.IllegalArgumentException > at > org.apache.hudi.common.util.ValidationUtils.checkArgument(ValidationUtils.java:31) > at > org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:414) > at > org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:395) > at > org.apache.hudi.common.table.timeline.HoodieActiveTimeline.saveAsComplete(HoodieActiveTimeline.java:153) > at > org.apache.hudi.client.AbstractHoodieWriteClient.commit(AbstractHoodieWriteClient.java:218) > at > org.apache.hudi.client.AbstractHoodieWriteClient.commitStats(AbstractHoodieWriteClient.java:190) > at > org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:124) > at > org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:617) > at > org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:274) > at > org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:164) > at > org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46) > at > org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70) > at > org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68) > at > org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90) > at > org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:185) > at > org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220) > at > org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:181) > at > org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134) > at > org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133) > at > org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989) > at > org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) > at > org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232) > at > org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110) > at > org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135) > at > org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) > at > org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232) > at > org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135) > at > org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253) > at > org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134) > at > org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68) > at > org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989) > at > org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438) > at > org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415) > at > org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293) > {code} > The validation at _*ValidationUtils.checkArgument*_ fails because the > expected commit file was not present on DFS. > > The test setup is this: > * Deltastreamer, continuous mode. Small batch sizes and very fast commit > times (high commit rate, every 10-30 seconds) > * A spark datasource writer moving flat parquet files from a source bucket > into the table maintained by the deltastreamer > * The spark datasource is much slower than the deltastreamer so > time-to-first-commit is about 2-8 minutes > What I see happen is the deltastreamer finalizing many commits while the > spark datasource is performing its write. It appears that the timeline > changes so much, so fast that the spark datasource writer becomes "out of > sync" in ways that it cannot recover. I see the {{Exception in thread "main" > java.lang.IllegalArgumentException}} error on the first commit of the > *datasource writer* every time. This appears to be a race condition when the > rate-of-change of the hudi timeline is very high (due to a fast deltastreamer > process). The spark datasource does not properly sync those changes in a > multi-writer configuration which causes enough inconsistency to crash the > job. > -- This message was sent by Atlassian Jira (v8.3.4#803005)