[ https://issues.apache.org/jira/browse/HUDI-2275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17398090#comment-17398090 ]
Dave Hagman commented on HUDI-2275: ----------------------------------- Continuing to investigate this and found something potentially interesting. I added the following log output to the _*TransactionUtils#*_ _*overrideWithLatestCommitMetadata*_ method in order to print out the previous instant's extra metadata (where the checkpoint metadata should live: {code:java} Map<String, String> extra = lastInstant.get().getRight(); LOG.info("******************* PRINTING EXTRA (lastInstant) ******************"); for (Map.Entry<String, String> entry : extra.entrySet()) { LOG.info(entry.getKey() + "=" + entry.getValue()); } LOG.info("******************* DONE PRINTING EXTRA ******************"); {code} I then performed the following steps: # Start DeltaStreamer and allow it to complete 2 commits # Start spark backfill job with write meta key prefix set # Allow the two concurrent writers to work until the inevitable failure of DeltaStreamer occurs # Let the spark backfill job continue to work with the DeltaStreamer offline When I noticed is: # With DeltaStreamer running I see *many* instances of the following error: ## {noformat} 21/08/12 12:59:34 ERROR RequestHandler: Got runtime exception servicing request partition=year%3D2014%2Fmonth%3D2%2Fday%3D16%2Fhour%3D0&basepath=s3%3A%2F%2Fdatalake%2Fevents&fileid=8d6dbf2f-e55e-49cc-9de0-ada768c0978e-0&lastinstantts=20210812124201&t imelinehash=e82c67277be5f2636a05a47aab62f681ee74929ad086f360cf23dd076ccce0c8 java.lang.IllegalArgumentException: Last known instant from client was 20210812124201 but server has the following timeline [[20210812121050__commit__COMPLETED], [20210812121143__commit__COMPLETED], [20210812121328__commit__COMPLETED], [20210812121518__commit__COMPLETED ], [20210812121707__commit__COMPLETED], [20210812121857__commit__COMPLETED], [20210812122133__commit__COMPLETED], [20210812122229__commit__COMPLETED], [20210812122421__commit__COMPLETED], [20210812122650__commit__COMPLETED], [20210812122907__commit__COMPLETED], [2021081 2123106__commit__COMPLETED], [20210812123300__commit__COMPLETED], [20210812123401__clean__COMPLETED], [20210812123412__commit__COMPLETED], [20210812123454__clean__COMPLETED], [20210812123459__commit__COMPLETED], [20210812123549__clean__COMPLETED], [20210812123745__clean __COMPLETED], [20210812123752__commit__COMPLETED], [20210812123949__clean__COMPLETED], [20210812123950__rollback__COMPLETED], [20210812123956__commit__COMPLETED], [20210812124201__clean__COMPLETED], [20210812124208__commit__COMPLETED], [20210812124413__commit__COMPLETED ], [20210812124612__clean__COMPLETED], [20210812124619__commit__COMPLETED], [20210812124824__clean__COMPLETED], [20210812124831__commit__COMPLETED], [20210812125109__clean__COMPLETED], [20210812125116__commit__COMPLETED], [20210812125322__clean__COMPLETED], [20210812125 329__commit__COMPLETED], [20210812125530__clean__COMPLETED]] at org.apache.hudi.common.util.ValidationUtils.checkArgument(ValidationUtils.java:40) at org.apache.hudi.timeline.service.RequestHandler$ViewHandler.handle(RequestHandler.java:443){noformat} # Once the DeltaStreamer stops the timeline errors above eventually stop # I then see output for my log statements that include the checkpoint metadata ## {noformat} 21/08/12 13:01:42 INFO S3NativeFileSystem: Opening 's3://datalake/events/.hoodie/20210812125329.commit' for reading 21/08/12 13:01:42 INFO TransactionUtils: ******************* PRINTING EXTRA (lastInstant) ****************** 21/08/12 13:01:42 INFO TransactionUtils: schema=... 21/08/12 13:01:42 INFO TransactionUtils: deltastreamer.checkpoint.key=topic-name,0:100,1:150,... 21/08/12 13:01:42 INFO TransactionUtils: ******************* DONE PRINTING EXTRA ****************** 21/08/12 13:01:42 INFO MultipartUploadOutputStream: close closed:false s3://datalake/events/.hoodie/20210812130006.commit{noformat} ## Last instant: *20210812125329*, Next instant *20210812130006* # What happens eventually is that the checkpoint metadata is no longer carried over from the last instant for some reason. I start to see logs like the following: ## {noformat} 21/08/12 13:03:27 INFO S3NativeFileSystem: Opening 's3://datalake/events/.hoodie/20210812130006.commit' for reading 21/08/12 13:03:27 INFO TransactionUtils: ******************* PRINTING EXTRA (lastInstant) ****************** 21/08/12 13:03:27 INFO TransactionUtils: schema=... 21/08/12 13:03:27 INFO TransactionUtils: ******************* DONE PRINTING EXTRA ****************** 21/08/12 13:03:27 INFO MultipartUploadOutputStream: close closed:false s3://datalake/events/.hoodie/20210812130150.commit{noformat} ## Notice how "schema" is now the only key in the extra metadata from the last instant (now: *20210812130006*) Now here is the interesting part. Between step 3 and 5 (where checkpoint meta is lost) there was a spark RDD error and it appears that the previous commit that should have had the checkpoint metadata carried over (instant: *20210812130006*), now does not. > HoodieDeltaStreamerException when using OCC and a second concurrent writer > -------------------------------------------------------------------------- > > Key: HUDI-2275 > URL: https://issues.apache.org/jira/browse/HUDI-2275 > Project: Apache Hudi > Issue Type: Bug > Components: DeltaStreamer, Spark Integration, Writer Core > Affects Versions: 0.9.0 > Reporter: Dave Hagman > Assignee: Sagar Sumit > Priority: Critical > Fix For: 0.10.0 > > > I am trying to utilize [Optimistic Concurrency > Control|https://hudi.apache.org/docs/concurrency_control] in order to allow > two writers to update a single table simultaneously. The two writers are: > * Writer A: Deltastreamer job consuming continuously from Kafka > * Writer B: A spark datasource-based writer that is consuming parquet files > out of S3 > * Table Type: Copy on Write > > After a few commits from each writer the deltastreamer will fail with the > following exception: > > {code:java} > org.apache.hudi.exception.HoodieDeltaStreamerException: Unable to find > previous checkpoint. Please double check if this table was indeed built via > delta streamer. Last Commit :Option{val=[20210803165741__commit__COMPLETED]}, > Instants :[[20210803165741__commit__COMPLETED]], CommitMetadata={ > "partitionToWriteStats" : { > ...{code} > > What appears to be happening is a lack of commit isolation between the two > writers > Writer B (spark datasource writer) will land commits which are eventually > picked up by Writer A (Delta Streamer). This is an issue because the Delta > Streamer needs checkpoint information which the spark datasource of course > does not include in its commits. My understanding was that OCC was built for > this very purpose (among others). > OCC config for Delta Streamer: > {code:java} > hoodie.write.concurrency.mode=optimistic_concurrency_control > hoodie.cleaner.policy.failed.writes=LAZY > > hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider > hoodie.write.lock.zookeeper.url=<zk_host> > hoodie.write.lock.zookeeper.port=2181 > hoodie.write.lock.zookeeper.lock_key=writer_lock > hoodie.write.lock.zookeeper.base_path=/hudi-write-locks{code} > > OCC config for spark datasource: > {code:java} > // Multi-writer concurrency > .option("hoodie.cleaner.policy.failed.writes", "LAZY") > .option("hoodie.write.concurrency.mode", "optimistic_concurrency_control") > .option( > "hoodie.write.lock.provider", > > org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider.class.getCanonicalName() > ) > .option("hoodie.write.lock.zookeeper.url", jobArgs.zookeeperHost) > .option("hoodie.write.lock.zookeeper.port", jobArgs.zookeeperPort) > .option("hoodie.write.lock.zookeeper.lock_key", "writer_lock") > .option("hoodie.write.lock.zookeeper.base_path", "/hudi-write-locks"){code} > h3. Steps to Reproduce: > * Start a deltastreamer job against some table Foo > * In parallel, start writing to the same table Foo using spark datasource > writer > * Note that after a few commits from each the deltastreamer is likely to > fail with the above exception when the datasource writer creates non-isolated > inflight commits > NOTE: I have not tested this with two of the same datasources (ex. two > deltastreamer jobs) > NOTE 2: Another detail that may be relevant is that the two writers are on > completely different spark clusters but I assumed this shouldn't be an issue > since we're locking using Zookeeper -- This message was sent by Atlassian Jira (v8.3.4#803005)