[ 
https://issues.apache.org/jira/browse/HUDI-2275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17425756#comment-17425756
 ] 

Dave Hagman commented on HUDI-2275:
-----------------------------------

[~shivnarayan] I received the same error (above) even when running from the jar 
built off of your branch. First I receive many errors (classified as warns) 
about the timeline being inconsistent (from the deltastreamer making changes) 
and then the job ultimately fails due to the call to _*checkArgument*_:

Error Details:
{code:java}
java.lang.IllegalArgumentException: Last known instant from client was 
20211007191403 but server has the following timeline 
[[20211005201655__rollback__COMPLETED], [20211005202809__rollback__COMPLETED], 
[20211006132623__rollback__COMPLETED], [20211007164718__rollback__COMPLETED], 
[20211007165545__rollback__COMPLETED], [20211007170747__rollback__COMPLETED], 
[20211007190436__commit__COMPLETED], [20211007190515__commit__COMPLETED], 
[20211007190542__clean__COMPLETED], [20211007190551__commit__COMPLETED], 
[20211007190614__clean__COMPLETED], [20211007190624__commit__COMPLETED], 
[20211007190646__clean__COMPLETED], [20211007190656__commit__COMPLETED], 
[20211007190719__clean__COMPLETED], [20211007190730__commit__COMPLETED], 
[20211007190753__clean__COMPLETED], [20211007190804__commit__COMPLETED], 
[20211007190828__clean__COMPLETED], [20211007190838__commit__COMPLETED], 
[20211007190900__clean__COMPLETED], [20211007190910__commit__COMPLETED], 
[20211007190933__clean__COMPLETED], [20211007190943__commit__COMPLETED], 
[20211007191009__clean__COMPLETED], [20211007191020__commit__COMPLETED], 
[20211007191046__clean__COMPLETED], [20211007191056__commit__COMPLETED], 
[20211007191118__clean__COMPLETED], [20211007191129__commit__COMPLETED], 
[20211007191151__clean__COMPLETED], [20211007191201__commit__COMPLETED], 
[20211007191223__clean__COMPLETED], [20211007191233__commit__COMPLETED], 
[20211007191257__clean__COMPLETED], [20211007191307__commit__COMPLETED], 
[20211007191329__clean__COMPLETED], [20211007191339__commit__COMPLETED], 
[20211007191403__clean__COMPLETED], [20211007191413__commit__COMPLETED], 
[20211007191438__clean__COMPLETED], [20211007191448__commit__COMPLETED], 
[20211007191510__clean__COMPLETED], [20211007191511__rollback__COMPLETED], 
[20211007191522__commit__COMPLETED], [20211007191546__clean__COMPLETED], 
[20211007191556__commit__COMPLETED], [20211007191624__clean__COMPLETED], 
[20211007191634__commit__COMPLETED], [20211007191704__clean__COMPLETED]]
        at 
org.apache.hudi.common.util.ValidationUtils.checkArgument(ValidationUtils.java:40)
        at 
org.apache.hudi.timeline.service.RequestHandler$ViewHandler.handle(RequestHandler.java:510)
        at 
io.javalin.security.SecurityUtil.noopAccessManager(SecurityUtil.kt:22)
        at io.javalin.Javalin.lambda$addHandler$0(Javalin.java:606)
        at 
io.javalin.core.JavalinServlet$service$2$1.invoke(JavalinServlet.kt:46)
        at 
io.javalin.core.JavalinServlet$service$2$1.invoke(JavalinServlet.kt:17)
        at 
io.javalin.core.JavalinServlet$service$1.invoke(JavalinServlet.kt:143)
        at io.javalin.core.JavalinServlet$service$2.invoke(JavalinServlet.kt:41)
        at io.javalin.core.JavalinServlet.service(JavalinServlet.kt:107)
        at 
io.javalin.core.util.JettyServerUtil$initialize$httpHandler$1.doHandle(JettyServerUtil.kt:72)
        at 
org.apache.hudi.org.apache.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:203)
        at 
org.apache.hudi.org.apache.jetty.servlet.ServletHandler.doScope(ServletHandler.java:480)
        at 
org.apache.hudi.org.apache.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1668)
        at 
org.apache.hudi.org.apache.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:201)
        at 
org.apache.hudi.org.apache.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1247)
        at 
org.apache.hudi.org.apache.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:144)
        at 
org.apache.hudi.org.apache.jetty.server.handler.HandlerList.handle(HandlerList.java:61)
        at 
org.apache.hudi.org.apache.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:174)
        at 
org.apache.hudi.org.apache.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
        at 
org.apache.hudi.org.apache.jetty.server.Server.handle(Server.java:502)
        at 
org.apache.hudi.org.apache.jetty.server.HttpChannel.handle(HttpChannel.java:370)
        at 
org.apache.hudi.org.apache.jetty.server.HttpConnection.onFillable(HttpConnection.java:267)
        at 
org.apache.hudi.org.apache.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:305)
        at 
org.apache.hudi.org.apache.jetty.io.FillInterest.fillable(FillInterest.java:103)
        at 
org.apache.hudi.org.apache.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:117)
        at 
org.apache.hudi.org.apache.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:333)
        at 
org.apache.hudi.org.apache.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:310)
        at 
org.apache.hudi.org.apache.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:168)
        at 
org.apache.hudi.org.apache.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:126)
        at 
org.apache.hudi.org.apache.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:366)
        at 
org.apache.hudi.org.apache.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:765)
        at 
org.apache.hudi.org.apache.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:683)
        at java.lang.Thread.run(Thread.java:748)
{code}
and then the job fails with:
{code:java}
Exception in thread "main" java.lang.IllegalArgumentException
        at 
org.apache.hudi.common.util.ValidationUtils.checkArgument(ValidationUtils.java:31)
        at 
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:450)
        at 
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:431)
        at 
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.saveAsComplete(HoodieActiveTimeline.java:153)
        at 
org.apache.hudi.client.AbstractHoodieWriteClient.commit(AbstractHoodieWriteClient.java:219)
        at 
org.apache.hudi.client.AbstractHoodieWriteClient.commitStats(AbstractHoodieWriteClient.java:191)
        at 
org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:124)
        at 
org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:620)
        at 
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:274)
        at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:164)
        at 
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
        at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:185)
        at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:181)
        at 
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134)
        at 
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133)
        at 
org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
        at 
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
        at 
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
        at 
org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
        at 
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
        at 
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
        at 
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
        at 
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
        at 
org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)
{code}
 

> HoodieDeltaStreamerException when using OCC and a second concurrent writer
> --------------------------------------------------------------------------
>
>                 Key: HUDI-2275
>                 URL: https://issues.apache.org/jira/browse/HUDI-2275
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: DeltaStreamer, Spark Integration, Writer Core
>    Affects Versions: 0.9.0
>            Reporter: Dave Hagman
>            Assignee: Sagar Sumit
>            Priority: Critical
>              Labels: sev:critical
>             Fix For: 0.10.0
>
>
>  I am trying to utilize [Optimistic Concurrency 
> Control|https://hudi.apache.org/docs/concurrency_control] in order to allow 
> two writers to update a single table simultaneously. The two writers are:
>  * Writer A: Deltastreamer job consuming continuously from Kafka
>  * Writer B: A spark datasource-based writer that is consuming parquet files 
> out of S3
>  * Table Type: Copy on Write
>  
> After a few commits from each writer the deltastreamer will fail with the 
> following exception:
>  
> {code:java}
> org.apache.hudi.exception.HoodieDeltaStreamerException: Unable to find 
> previous checkpoint. Please double check if this table was indeed built via 
> delta streamer. Last Commit :Option{val=[20210803165741__commit__COMPLETED]}, 
> Instants :[[20210803165741__commit__COMPLETED]], CommitMetadata={
>  "partitionToWriteStats" : {
>  ...{code}
>  
> What appears to be happening is a lack of commit isolation between the two 
> writers
>  Writer B (spark datasource writer) will land commits which are eventually 
> picked up by Writer A (Delta Streamer). This is an issue because the Delta 
> Streamer needs checkpoint information which the spark datasource of course 
> does not include in its commits. My understanding was that OCC was built for 
> this very purpose (among others). 
> OCC config for Delta Streamer:
> {code:java}
> hoodie.write.concurrency.mode=optimistic_concurrency_control
>  hoodie.cleaner.policy.failed.writes=LAZY
>  
> hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider
>  hoodie.write.lock.zookeeper.url=<zk_host>
>  hoodie.write.lock.zookeeper.port=2181
>  hoodie.write.lock.zookeeper.lock_key=writer_lock
>  hoodie.write.lock.zookeeper.base_path=/hudi-write-locks{code}
>  
> OCC config for spark datasource:
> {code:java}
> // Multi-writer concurrency
>  .option("hoodie.cleaner.policy.failed.writes", "LAZY")
>  .option("hoodie.write.concurrency.mode", "optimistic_concurrency_control")
>  .option(
>  "hoodie.write.lock.provider",
>  
> org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider.class.getCanonicalName()
>  )
>  .option("hoodie.write.lock.zookeeper.url", jobArgs.zookeeperHost)
>  .option("hoodie.write.lock.zookeeper.port", jobArgs.zookeeperPort)
>  .option("hoodie.write.lock.zookeeper.lock_key", "writer_lock")
>  .option("hoodie.write.lock.zookeeper.base_path", "/hudi-write-locks"){code}
> h3. Steps to Reproduce:
>  * Start a deltastreamer job against some table Foo
>  * In parallel, start writing to the same table Foo using spark datasource 
> writer
>  * Note that after a few commits from each the deltastreamer is likely to 
> fail with the above exception when the datasource writer creates non-isolated 
> inflight commits
> NOTE: I have not tested this with two of the same datasources (ex. two 
> deltastreamer jobs)
> NOTE 2: Another detail that may be relevant is that the two writers are on 
> completely different spark clusters but I assumed this shouldn't be an issue 
> since we're locking using Zookeeper



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to