[ https://issues.apache.org/jira/browse/HUDI-7507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
sivabalan narayanan reassigned HUDI-7507: ----------------------------------------- Assignee: sivabalan narayanan > ongoing concurrent writers with smaller timestamp can cause issues with > table services > --------------------------------------------------------------------------------------- > > Key: HUDI-7507 > URL: https://issues.apache.org/jira/browse/HUDI-7507 > Project: Apache Hudi > Issue Type: Improvement > Components: table-service > Reporter: Krishen Bhan > Assignee: sivabalan narayanan > Priority: Major > Labels: pull-request-available > Fix For: 0.16.0 > > Attachments: Flowchart (1).png, Flowchart.png > > > *Scenarios:* > Although HUDI operations hold a table lock when creating a .requested > instant, because HUDI writers do not generate a timestamp and create a > .requsted plan in the same transaction, there can be a scenario where > # Job 1 starts, chooses timestamp (x) , Job 2 starts and chooses timestamp > (x - 1) > # Job 1 schedules and creates requested file with instant timestamp (x) > # Job 2 schedules and creates requested file with instant timestamp (x-1) > # Both jobs continue running > If one job is writing a commit and the other is a table service, this can > cause issues: > * > ** If Job 2 is ingestion commit and Job 1 is compaction/log compaction, then > when Job 1 runs before Job 2 and can create a compaction plan for all instant > times (up to (x) ) that doesn’t include instant time (x-1) . Later Job 2 > will create instant time (x-1), but timeline will be in a corrupted state > since compaction plan was supposed to include (x-1) > ** There is a similar issue with clean. If Job2 is a long-running commit > (that was stuck/delayed for a while before creating its .requested plan) and > Job 1 is a clean, then Job 1 can perform a clean that updates the > earliest-commit-to-retain without waiting for the inflight instant by Job 2 > at (x-1) to complete. This causes Job2 to be "skipped" by clean. > ** If the completed commit files include som sort of "checkpointing" with > another "downstream job" performing incremental reads on this dataset (such > as Hoodie Streamer/DeltaSync) then there may be incorrect behavior, such as > the incremental reader skipping some completed commits (that have a smaller > instant timestamp than latest completed commit but were created after). > [Edit] I added a diagram to visualize the issue, specifically the second > scenario with clean > !Flowchart (1).png! > *Proposed approach:* > One way this can be resolved is by combining the operations of generating > instant time and creating a requested file in the same HUDI table > transaction. Specifically, executing the following steps whenever any instant > (commit, table service, etc) is scheduled > Approach A > # Acquire table lock > # Look at the latest instant C on the active timeline (completed or not). > Generate a timestamp after C > # Create the plan and requested file using this new timestamp ( that is > greater than C) > # Release table lock > Unfortunately (A) has the following drawbacks > * Every operation must now hold the table lock when computing its plan even > if it's an expensive operation and will take a while > * Users of HUDI cannot easily set their own instant time of an operation, > and this restriction would break any public APIs that allow this and would > require deprecating those APIs. > > An alternate approach is to have every operation abort creating a .requested > file unless it has the latest timestamp. Specifically, for any instant type, > whenever an operation is about to create a .requested plan on timeline, it > should take the table lock and assert that there are no other instants on > timeline that are greater than it that could cause a conflict. If that > assertion fails, then throw a retry-able conflict resolution exception. > Specifically, the following steps should be followed whenever any instant > (commit, table service, etc) is scheduled > Approach B > # Acquire table lock. Assume that the desired instant time C and requested > file plan metadata have already been created, regardless of wether it was > before this step or right after acquiring the table lock. > # If there are any instants on the timeline that are greater than C > (regardless of their operation type or sate status) then release table lock > and throw an exception > # Create requested plan on timeline (As usual) > # Release table lock > Unlike (A), this approach (B) allows users to continue to use HUDI APIs where > caller can specify instant time (preventing the need from deprecating any > public API). It also allows the possibility of table service operations > computing their plan without holding a lock. Despite this though, (B) has > following drawbacks > * It is not immediately clear how MDT vs base table operations should be > handled here. Do we need to update (2) to consider both base table and MDT > timelines (rather than just MDT)? > * This error will still be thrown even for scenarios of concurrent > operations where it would be safe to continue. For example, assume two > ingestion writers being executing on a dataset, with each only performing a > insert commit on the dataset (with no compact/clean being scheduled on MDT). > Additionally, assume there is no "downstream" job performing incremental > reads on this dataset. If the writer that started scheduling later ending up > having an earlier timestamp, it would still be safe for it to continue. > Despite that, because of step (2) it would still have to abort an throw an > error. This also means that on datasets with many frequent concurrent > ingestion commits and very infrequent metadata compactions, there would be a > lot of transient failures/noise by failing writers if this timestamp delay > issue happens a lot. > Between these two approaches, it seems (B) might be preferable since it > allows user to still use existing APIs for the time being. > We were wondering if the Apache HUDI project team would be interested in > investigating and implementing (B) to resolve this issue? -- This message was sent by Atlassian Jira (v8.20.10#820010)