[ 
https://issues.apache.org/jira/browse/HUDI-2275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17396727#comment-17396727
 ] 

Vinoth Chandar commented on HUDI-2275:
--------------------------------------

[~dave_hagman] thanks for filing this and bringing to my notice. Issue seems 
more about the spark based datasource writer, not populating the kafka 
checkpoints, maintained commit files. IIRC there was a fix to keep looking 
backwards for the latest commit time where you'd find the last checkpoint 
written by delta streamer. Is this on master? (I assume so)

cc [~nishith29] , to chime in here. this seems like the exact scenario we 
tested ?  [~vino] this is same as backfill and regular delta streamer and it 
should have worked right. 

> HoodieDeltaStreamerException when using OCC and a second concurrent writer
> --------------------------------------------------------------------------
>
>                 Key: HUDI-2275
>                 URL: https://issues.apache.org/jira/browse/HUDI-2275
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: DeltaStreamer, Spark Integration, Writer Core
>    Affects Versions: 0.9.0
>            Reporter: Dave Hagman
>            Priority: Critical
>             Fix For: 0.10.0
>
>
>  I am trying to utilize [Optimistic Concurrency 
> Control|https://hudi.apache.org/docs/concurrency_control] in order to allow 
> two writers to update a single table simultaneously. The two writers are:
>  * Writer A: Deltastreamer job consuming continuously from Kafka
>  * Writer B: A spark datasource-based writer that is consuming parquet files 
> out of S3
>  * Table Type: Copy on Write
>  
> After a few commits from each writer the deltastreamer will fail with the 
> following exception:
>  
> {code:java}
> org.apache.hudi.exception.HoodieDeltaStreamerException: Unable to find 
> previous checkpoint. Please double check if this table was indeed built via 
> delta streamer. Last Commit :Option{val=[20210803165741__commit__COMPLETED]}, 
> Instants :[[20210803165741__commit__COMPLETED]], CommitMetadata={
>  "partitionToWriteStats" : {
>  ...{code}
>  
> What appears to be happening is a lack of commit isolation between the two 
> writers
>  Writer B (spark datasource writer) will land commits which are eventually 
> picked up by Writer A (Delta Streamer). This is an issue because the Delta 
> Streamer needs checkpoint information which the spark datasource of course 
> does not include in its commits. My understanding was that OCC was built for 
> this very purpose (among others). 
> OCC config for Delta Streamer:
> {code:java}
> hoodie.write.concurrency.mode=optimistic_concurrency_control
>  hoodie.cleaner.policy.failed.writes=LAZY
>  
> hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider
>  hoodie.write.lock.zookeeper.url=<zk_host>
>  hoodie.write.lock.zookeeper.port=2181
>  hoodie.write.lock.zookeeper.lock_key=writer_lock
>  hoodie.write.lock.zookeeper.base_path=/hudi-write-locks{code}
>  
> OCC config for spark datasource:
> {code:java}
> // Multi-writer concurrency
>  .option("hoodie.cleaner.policy.failed.writes", "LAZY")
>  .option("hoodie.write.concurrency.mode", "optimistic_concurrency_control")
>  .option(
>  "hoodie.write.lock.provider",
>  
> org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider.class.getCanonicalName()
>  )
>  .option("hoodie.write.lock.zookeeper.url", jobArgs.zookeeperHost)
>  .option("hoodie.write.lock.zookeeper.port", jobArgs.zookeeperPort)
>  .option("hoodie.write.lock.zookeeper.lock_key", "writer_lock")
>  .option("hoodie.write.lock.zookeeper.base_path", "/hudi-write-locks"){code}
> h3. Steps to Reproduce:
>  * Start a deltastreamer job against some table Foo
>  * In parallel, start writing to the same table Foo using spark datasource 
> writer
>  * Note that after a few commits from each the deltastreamer is likely to 
> fail with the above exception when the datasource writer creates non-isolated 
> inflight commits
> NOTE: I have not tested this with two of the same datasources (ex. two 
> deltastreamer jobs)
> NOTE 2: Another detail that may be relevant is that the two writers are on 
> completely different spark clusters but I assumed this shouldn't be an issue 
> since we're locking using Zookeeper



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to