[ 
https://issues.apache.org/jira/browse/HUDI-2275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17396830#comment-17396830
 ] 

Dave Hagman edited comment on HUDI-2275 at 8/10/21, 6:25 PM:
-------------------------------------------------------------

[~vinoth] OK that clarifies how multiple writers wouldn't steamroll each 
other's commits (that was definitely a gap in my understanding). 

The primary error that I am getting is in fact due to the commit file 
differences between deltastreamer (source = Kafka) and a spark datasource 
consuming in batch from static S3 data. 

 
{quote} IIRC there was a fix to keep looking backwards for the latest commit 
time where you'd find the last checkpoint written by delta streamer. Is this on 
master?
{quote}
I did not see anything that was doing this during my deep dive into the 
DeltaSync code but I definitely could have missed something. The method of 
having to crawl backwards and open every commit to find the checkpoint info 
seems like it could be very inefficient especially for configurations with a 
large number of retained commits. Were any issues found there while performance 
testing? 


was (Author: dave_hagman):
[~vinoth] OK that at least ties together what I was missing around how multiple 
writers wouldn't steamroll each other's commits.

The primary error that I am getting is in fact due to the commit file 
differences between deltastreamer (source = Kafka) and a spark datasource 
consuming in batch from static S3 data. 

 
{quote} IIRC there was a fix to keep looking backwards for the latest commit 
time where you'd find the last checkpoint written by delta streamer. Is this on 
master?
{quote}
I did not see anything that was doing this during my deep dive into the 
DeltaSync code but I definitely could have missed something. The method of 
having to crawl backwards and open every commit to find the checkpoint info 
seems like it could be very inefficient especially for configurations with a 
large number of retained commits. Were any issues found there while performance 
testing? 

> HoodieDeltaStreamerException when using OCC and a second concurrent writer
> --------------------------------------------------------------------------
>
>                 Key: HUDI-2275
>                 URL: https://issues.apache.org/jira/browse/HUDI-2275
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: DeltaStreamer, Spark Integration, Writer Core
>    Affects Versions: 0.9.0
>            Reporter: Dave Hagman
>            Assignee: Sagar Sumit
>            Priority: Critical
>             Fix For: 0.10.0
>
>
>  I am trying to utilize [Optimistic Concurrency 
> Control|https://hudi.apache.org/docs/concurrency_control] in order to allow 
> two writers to update a single table simultaneously. The two writers are:
>  * Writer A: Deltastreamer job consuming continuously from Kafka
>  * Writer B: A spark datasource-based writer that is consuming parquet files 
> out of S3
>  * Table Type: Copy on Write
>  
> After a few commits from each writer the deltastreamer will fail with the 
> following exception:
>  
> {code:java}
> org.apache.hudi.exception.HoodieDeltaStreamerException: Unable to find 
> previous checkpoint. Please double check if this table was indeed built via 
> delta streamer. Last Commit :Option{val=[20210803165741__commit__COMPLETED]}, 
> Instants :[[20210803165741__commit__COMPLETED]], CommitMetadata={
>  "partitionToWriteStats" : {
>  ...{code}
>  
> What appears to be happening is a lack of commit isolation between the two 
> writers
>  Writer B (spark datasource writer) will land commits which are eventually 
> picked up by Writer A (Delta Streamer). This is an issue because the Delta 
> Streamer needs checkpoint information which the spark datasource of course 
> does not include in its commits. My understanding was that OCC was built for 
> this very purpose (among others). 
> OCC config for Delta Streamer:
> {code:java}
> hoodie.write.concurrency.mode=optimistic_concurrency_control
>  hoodie.cleaner.policy.failed.writes=LAZY
>  
> hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider
>  hoodie.write.lock.zookeeper.url=<zk_host>
>  hoodie.write.lock.zookeeper.port=2181
>  hoodie.write.lock.zookeeper.lock_key=writer_lock
>  hoodie.write.lock.zookeeper.base_path=/hudi-write-locks{code}
>  
> OCC config for spark datasource:
> {code:java}
> // Multi-writer concurrency
>  .option("hoodie.cleaner.policy.failed.writes", "LAZY")
>  .option("hoodie.write.concurrency.mode", "optimistic_concurrency_control")
>  .option(
>  "hoodie.write.lock.provider",
>  
> org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider.class.getCanonicalName()
>  )
>  .option("hoodie.write.lock.zookeeper.url", jobArgs.zookeeperHost)
>  .option("hoodie.write.lock.zookeeper.port", jobArgs.zookeeperPort)
>  .option("hoodie.write.lock.zookeeper.lock_key", "writer_lock")
>  .option("hoodie.write.lock.zookeeper.base_path", "/hudi-write-locks"){code}
> h3. Steps to Reproduce:
>  * Start a deltastreamer job against some table Foo
>  * In parallel, start writing to the same table Foo using spark datasource 
> writer
>  * Note that after a few commits from each the deltastreamer is likely to 
> fail with the above exception when the datasource writer creates non-isolated 
> inflight commits
> NOTE: I have not tested this with two of the same datasources (ex. two 
> deltastreamer jobs)
> NOTE 2: Another detail that may be relevant is that the two writers are on 
> completely different spark clusters but I assumed this shouldn't be an issue 
> since we're locking using Zookeeper



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to