[ 
https://issues.apache.org/jira/browse/HUDI-2275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17398090#comment-17398090
 ] 

Dave Hagman commented on HUDI-2275:
-----------------------------------

Continuing to investigate this and found something potentially interesting. I 
added the following log output to the _*TransactionUtils#*_

_*overrideWithLatestCommitMetadata*_ method in order to print out the previous 
instant's extra metadata (where the checkpoint metadata should live:
{code:java}
Map<String, String> extra = lastInstant.get().getRight();
LOG.info("******************* PRINTING EXTRA (lastInstant) ******************");
for (Map.Entry<String, String> entry : extra.entrySet()) {
    LOG.info(entry.getKey() + "=" + entry.getValue());
}
LOG.info("******************* DONE PRINTING EXTRA ******************");
{code}
I then performed the following steps:
 # Start DeltaStreamer and allow it to complete 2 commits
 # Start spark backfill job with write meta key prefix set
 # Allow the two concurrent writers to work until the inevitable failure of 
DeltaStreamer occurs
 # Let the spark backfill job continue to work with the DeltaStreamer offline

When I noticed is:
 # With DeltaStreamer running I see *many* instances of the following error:
 ## 
{noformat}
21/08/12 12:59:34 ERROR RequestHandler: Got runtime exception servicing request 
partition=year%3D2014%2Fmonth%3D2%2Fday%3D16%2Fhour%3D0&basepath=s3%3A%2F%2Fdatalake%2Fevents&fileid=8d6dbf2f-e55e-49cc-9de0-ada768c0978e-0&lastinstantts=20210812124201&t
imelinehash=e82c67277be5f2636a05a47aab62f681ee74929ad086f360cf23dd076ccce0c8
java.lang.IllegalArgumentException: Last known instant from client was 
20210812124201 but server has the following timeline 
[[20210812121050__commit__COMPLETED], [20210812121143__commit__COMPLETED], 
[20210812121328__commit__COMPLETED], [20210812121518__commit__COMPLETED
], [20210812121707__commit__COMPLETED], [20210812121857__commit__COMPLETED], 
[20210812122133__commit__COMPLETED], [20210812122229__commit__COMPLETED], 
[20210812122421__commit__COMPLETED], [20210812122650__commit__COMPLETED], 
[20210812122907__commit__COMPLETED], [2021081
2123106__commit__COMPLETED], [20210812123300__commit__COMPLETED], 
[20210812123401__clean__COMPLETED], [20210812123412__commit__COMPLETED], 
[20210812123454__clean__COMPLETED], [20210812123459__commit__COMPLETED], 
[20210812123549__clean__COMPLETED], [20210812123745__clean
__COMPLETED], [20210812123752__commit__COMPLETED], 
[20210812123949__clean__COMPLETED], [20210812123950__rollback__COMPLETED], 
[20210812123956__commit__COMPLETED], [20210812124201__clean__COMPLETED], 
[20210812124208__commit__COMPLETED], [20210812124413__commit__COMPLETED
], [20210812124612__clean__COMPLETED], [20210812124619__commit__COMPLETED], 
[20210812124824__clean__COMPLETED], [20210812124831__commit__COMPLETED], 
[20210812125109__clean__COMPLETED], [20210812125116__commit__COMPLETED], 
[20210812125322__clean__COMPLETED], [20210812125
329__commit__COMPLETED], [20210812125530__clean__COMPLETED]]
        at 
org.apache.hudi.common.util.ValidationUtils.checkArgument(ValidationUtils.java:40)
        at 
org.apache.hudi.timeline.service.RequestHandler$ViewHandler.handle(RequestHandler.java:443){noformat}

 # Once the DeltaStreamer stops the timeline errors above eventually stop
 # I then see output for my log statements that include the checkpoint metadata
 ## 
{noformat}
21/08/12 13:01:42 INFO S3NativeFileSystem: Opening 
's3://datalake/events/.hoodie/20210812125329.commit' for reading
21/08/12 13:01:42 INFO TransactionUtils: ******************* PRINTING EXTRA 
(lastInstant) ******************
21/08/12 13:01:42 INFO TransactionUtils: schema=...
21/08/12 13:01:42 INFO TransactionUtils: 
deltastreamer.checkpoint.key=topic-name,0:100,1:150,...
21/08/12 13:01:42 INFO TransactionUtils: ******************* DONE PRINTING 
EXTRA ******************
21/08/12 13:01:42 INFO MultipartUploadOutputStream: close closed:false 
s3://datalake/events/.hoodie/20210812130006.commit{noformat}

 ## Last instant: *20210812125329*, Next instant *20210812130006*
 # What happens eventually is that the checkpoint metadata is no longer carried 
over from the last instant for some reason. I start to see logs like the 
following:
 ## 
{noformat}
21/08/12 13:03:27 INFO S3NativeFileSystem: Opening 
's3://datalake/events/.hoodie/20210812130006.commit' for reading
21/08/12 13:03:27 INFO TransactionUtils: ******************* PRINTING EXTRA 
(lastInstant) ******************
21/08/12 13:03:27 INFO TransactionUtils: schema=...
21/08/12 13:03:27 INFO TransactionUtils: ******************* DONE PRINTING 
EXTRA ******************
21/08/12 13:03:27 INFO MultipartUploadOutputStream: close closed:false 
s3://datalake/events/.hoodie/20210812130150.commit{noformat}

 ## Notice how "schema" is now the only key in the extra metadata from the last 
instant (now: *20210812130006*)

 

Now here is the interesting part. Between step 3 and 5 (where checkpoint meta 
is lost) there was a spark RDD error and it appears that the previous commit 
that should have had the checkpoint metadata carried over (instant: 
*20210812130006*), now does not. 

 

> HoodieDeltaStreamerException when using OCC and a second concurrent writer
> --------------------------------------------------------------------------
>
>                 Key: HUDI-2275
>                 URL: https://issues.apache.org/jira/browse/HUDI-2275
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: DeltaStreamer, Spark Integration, Writer Core
>    Affects Versions: 0.9.0
>            Reporter: Dave Hagman
>            Assignee: Sagar Sumit
>            Priority: Critical
>             Fix For: 0.10.0
>
>
>  I am trying to utilize [Optimistic Concurrency 
> Control|https://hudi.apache.org/docs/concurrency_control] in order to allow 
> two writers to update a single table simultaneously. The two writers are:
>  * Writer A: Deltastreamer job consuming continuously from Kafka
>  * Writer B: A spark datasource-based writer that is consuming parquet files 
> out of S3
>  * Table Type: Copy on Write
>  
> After a few commits from each writer the deltastreamer will fail with the 
> following exception:
>  
> {code:java}
> org.apache.hudi.exception.HoodieDeltaStreamerException: Unable to find 
> previous checkpoint. Please double check if this table was indeed built via 
> delta streamer. Last Commit :Option{val=[20210803165741__commit__COMPLETED]}, 
> Instants :[[20210803165741__commit__COMPLETED]], CommitMetadata={
>  "partitionToWriteStats" : {
>  ...{code}
>  
> What appears to be happening is a lack of commit isolation between the two 
> writers
>  Writer B (spark datasource writer) will land commits which are eventually 
> picked up by Writer A (Delta Streamer). This is an issue because the Delta 
> Streamer needs checkpoint information which the spark datasource of course 
> does not include in its commits. My understanding was that OCC was built for 
> this very purpose (among others). 
> OCC config for Delta Streamer:
> {code:java}
> hoodie.write.concurrency.mode=optimistic_concurrency_control
>  hoodie.cleaner.policy.failed.writes=LAZY
>  
> hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider
>  hoodie.write.lock.zookeeper.url=<zk_host>
>  hoodie.write.lock.zookeeper.port=2181
>  hoodie.write.lock.zookeeper.lock_key=writer_lock
>  hoodie.write.lock.zookeeper.base_path=/hudi-write-locks{code}
>  
> OCC config for spark datasource:
> {code:java}
> // Multi-writer concurrency
>  .option("hoodie.cleaner.policy.failed.writes", "LAZY")
>  .option("hoodie.write.concurrency.mode", "optimistic_concurrency_control")
>  .option(
>  "hoodie.write.lock.provider",
>  
> org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider.class.getCanonicalName()
>  )
>  .option("hoodie.write.lock.zookeeper.url", jobArgs.zookeeperHost)
>  .option("hoodie.write.lock.zookeeper.port", jobArgs.zookeeperPort)
>  .option("hoodie.write.lock.zookeeper.lock_key", "writer_lock")
>  .option("hoodie.write.lock.zookeeper.base_path", "/hudi-write-locks"){code}
> h3. Steps to Reproduce:
>  * Start a deltastreamer job against some table Foo
>  * In parallel, start writing to the same table Foo using spark datasource 
> writer
>  * Note that after a few commits from each the deltastreamer is likely to 
> fail with the above exception when the datasource writer creates non-isolated 
> inflight commits
> NOTE: I have not tested this with two of the same datasources (ex. two 
> deltastreamer jobs)
> NOTE 2: Another detail that may be relevant is that the two writers are on 
> completely different spark clusters but I assumed this shouldn't be an issue 
> since we're locking using Zookeeper



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to