[jira] [Created] (HUDI-7946) [Umbrella] RFC-79 : Improving reliability of concurrent table service executions and rollbacks
Krishen Bhan created HUDI-7946: -- Summary: [Umbrella] RFC-79 : Improving reliability of concurrent table service executions and rollbacks Key: HUDI-7946 URL: https://issues.apache.org/jira/browse/HUDI-7946 Project: Apache Hudi Issue Type: Epic Components: multi-writer, table-service Reporter: Krishen Bhan This is the umbrella ticket that tracks the overall implementation of RFC-79 h4. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (HUDI-7507) ongoing concurrent writers with smaller timestamp can cause issues with table services
[ https://issues.apache.org/jira/browse/HUDI-7507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Krishen Bhan updated HUDI-7507: --- Description: *Scenarios:* Although HUDI operations hold a table lock when creating a .requested instant, because HUDI writers do not generate a timestamp and create a .requsted plan in the same transaction, there can be a scenario where # Job 1 starts, chooses timestamp (x) , Job 2 starts and chooses timestamp (x - 1) # Job 1 schedules and creates requested file with instant timestamp (x) # Job 2 schedules and creates requested file with instant timestamp (x-1) # Both jobs continue running If one job is writing a commit and the other is a table service, this can cause issues: * ** If Job 2 is ingestion commit and Job 1 is compaction/log compaction, then when Job 1 runs before Job 2 and can create a compaction plan for all instant times (up to (x) ) that doesn’t include instant time (x-1) . Later Job 2 will create instant time (x-1), but timeline will be in a corrupted state since compaction plan was supposed to include (x-1) ** There is a similar issue with clean. If Job2 is a long-running commit (that was stuck/delayed for a while before creating its .requested plan) and Job 1 is a clean, then Job 1 can perform a clean that updates the earliest-commit-to-retain without waiting for the inflight instant by Job 2 at (x-1) to complete. This causes Job2 to be "skipped" by clean. ** If the completed commit files include som sort of "checkpointing" with another "downstream job" performing incremental reads on this dataset (such as Hoodie Streamer/DeltaSync) then there may be incorrect behavior, such as the incremental reader skipping some completed commits (that have a smaller instant timestamp than latest completed commit but were created after). [Edit] I added a diagram to visualize the issue, specifically the second scenario with clean !Flowchart (1).png! *Proposed approach:* One way this can be resolved is by combining the operations of generating instant time and creating a requested file in the same HUDI table transaction. Specifically, executing the following steps whenever any instant (commit, table service, etc) is scheduled Approach A # Acquire table lock # Look at the latest instant C on the active timeline (completed or not). Generate a timestamp after C # Create the plan and requested file using this new timestamp ( that is greater than C) # Release table lock Unfortunately (A) has the following drawbacks * Every operation must now hold the table lock when computing its plan even if it's an expensive operation and will take a while * Users of HUDI cannot easily set their own instant time of an operation, and this restriction would break any public APIs that allow this and would require deprecating those APIs. An alternate approach is to have every operation abort creating a .requested file unless it has the latest timestamp. Specifically, for any instant type, whenever an operation is about to create a .requested plan on timeline, it should take the table lock and assert that there are no other instants on timeline that are greater than it that could cause a conflict. If that assertion fails, then throw a retry-able conflict resolution exception. Specifically, the following steps should be followed whenever any instant (commit, table service, etc) is scheduled Approach B # Acquire table lock. Assume that the desired instant time C and requested file plan metadata have already been created, regardless of wether it was before this step or right after acquiring the table lock. # If there are any instants on the timeline that are greater than C (regardless of their operation type or sate status) then release table lock and throw an exception # Create requested plan on timeline (As usual) # Release table lock Unlike (A), this approach (B) allows users to continue to use HUDI APIs where caller can specify instant time (preventing the need from deprecating any public API). It also allows the possibility of table service operations computing their plan without holding a lock. Despite this though, (B) has following drawbacks * It is not immediately clear how MDT vs base table operations should be handled here. Do we need to update (2) to consider both base table and MDT timelines (rather than just MDT)? * This error will still be thrown even for scenarios of concurrent operations where it would be safe to continue. For example, assume two ingestion writers being executing on a dataset, with each only performing a insert commit on the dataset (with no compact/clean being scheduled on MDT). Additionally, assume there is no "downstream" job performing incremental reads on this dataset. If the writer that started scheduling later ending up having an earlier timestamp, it would still be safe for it to continue. Despite that, because of
[jira] [Updated] (HUDI-7507) ongoing concurrent writers with smaller timestamp can cause issues with table services
[ https://issues.apache.org/jira/browse/HUDI-7507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Krishen Bhan updated HUDI-7507: --- Description: *Scenarios:* Although HUDI operations hold a table lock when creating a .requested instant, because HUDI writers do not generate a timestamp and create a .requsted plan in the same transaction, there can be a scenario where # Job 1 starts, chooses timestamp (x) , Job 2 starts and chooses timestamp (x - 1) # Job 1 schedules and creates requested file with instant timestamp (x) # Job 2 schedules and creates requested file with instant timestamp (x-1) # Both jobs continue running If one job is writing a commit and the other is a table service, this can cause issues: * ** If Job 2 is ingestion commit and Job 1 is compaction/log compaction, then when Job 1 runs before Job 2 and can create a compaction plan for all instant times (up to (x) ) that doesn’t include instant time (x-1) . Later Job 2 will create instant time (x-1), but timeline will be in a corrupted state since compaction plan was supposed to include (x-1) ** There is a similar issue with clean. If Job2 is a long-running commit (that was stuck/delayed for a while before creating its .requested plan) and Job 1 is a clean, then Job 1 can perform a clean that updates the earliest-commit-to-retain without waiting for the inflight instant by Job 2 at (x-1) to complete. This causes Job2 to be "skipped" by clean. [Edit] I added a diagram to visualize the issue, specifically the second scenario with clean !Flowchart (1).png! *Proposed approach:* One way this can be resolved is by combining the operations of generating instant time and creating a requested file in the same HUDI table transaction. Specifically, executing the following steps whenever any instant (commit, table service, etc) is scheduled Approach A # Acquire table lock # Look at the latest instant C on the active timeline (completed or not). Generate a timestamp after C # Create the plan and requested file using this new timestamp ( that is greater than C) # Release table lock Unfortunately (A) has the following drawbacks * Every operation must now hold the table lock when computing its plan even if it's an expensive operation and will take a while * Users of HUDI cannot easily set their own instant time of an operation, and this restriction would break any public APIs that allow this and would require deprecating those APIs. An alternate approach is to have every operation abort creating a .requested file unless it has the latest timestamp. Specifically, for any instant type, whenever an operation is about to create a .requested plan on timeline, it should take the table lock and assert that there are no other instants on timeline that are greater than it that could cause a conflict. If that assertion fails, then throw a retry-able conflict resolution exception. Specifically, the following steps should be followed whenever any instant (commit, table service, etc) is scheduled Approach B # Acquire table lock. Assume that the desired instant time C and requested file plan metadata have already been created, regardless of wether it was before this step or right after acquiring the table lock. # Get the set of all instants on the timeline that are greater than C (regardless of their operation type or sate status). ## If the current operation is an "ingestion" type (commit/deltacommit/insert_overwrite replacecommit) then assert the set is empty. This is because another "ingestion" operation with a later instant time might schedule and execute a compaction at said instant time in MDT, leading the table in the aforementioned situation where a compact on MDT is scheduled after an inflight ingestion commit. ## If the current operation is a "table service" (clean/compaction/cluster) then assert that the set doesn't contain any table service instant types (clean/compaction/cluster). # Create requested plan on timeline (As usual) # Release table Unlike (A), this approach (B) allows users to continue to use HUDI APIs where caller can specify instant time (preventing the need from deprecating any public API). It also allows the possibility of table service operations computing their plan without holding a lock. Despite this though, (B) has following drawbacks * It is not immediately clear how MDT vs base table operations should be handled here. Do we need to update (2) to build it's set from both base table and MDT timelines (rather than just MDT)? * This error will still be thrown even for scenarios of concurrent operations where it would be safe to continue. For example, assume two ingestion writers being executing on a dataset, with each only performing a insert commit on the dataset (with no table service being scheduled on MDT). If the writer that started scheduling later ending up having an earlier timestamp,
[jira] [Created] (HUDI-7687) Instant should not be archived until replaced file groups or older file versions are deleted
Krishen Bhan created HUDI-7687: -- Summary: Instant should not be archived until replaced file groups or older file versions are deleted Key: HUDI-7687 URL: https://issues.apache.org/jira/browse/HUDI-7687 Project: Apache Hudi Issue Type: Improvement Reporter: Krishen Bhan When archival runs it may consider an instant as a candidate for archival even if the file groups said instant replaced/updated still need to undergo a `clean`. For example, consider the following scenario with clean and archived scheduled/executed independently in different jobs # Insert at C1 creates file group f1 in partition # Replacecommit at RC2 creates file group f2 in partition, and replaces f1 # Any reader of partition that calls HUDI API (with or without using MDT) will recognize that f1 should be ignored, as it has been replaced. This is since RC2 instant file is in active timeline # Some more instants are added to timeline. RC2 is now eligible to be cleaned (as per the table writers' clean policy). Assume though that file groups replaces by RC2 haven't been deleted yet, such as due to clean repeatedly failing, async clean not being scheduled yet, or the clean failing to delete said file groups. # An archive job eventually is triggered, and archives C1 and RC2. Note that f1 is still in partition Now the table has the same consistency issue as seen in https://issues.apache.org/jira/browse/HUDI-7655 , where replaced file groups are still in partition and readers may see inconsistent data. This situation can be avoided by ensuring that archival will "block" and no go past an older instant time if it sees that said instant didn't undergo a clean yet. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (HUDI-7655) Support configuration for clean to fail execution if there is at least one file is marked as a failed delete
[ https://issues.apache.org/jira/browse/HUDI-7655?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Krishen Bhan updated HUDI-7655: --- Description: When a HUDI clean plan is executed, any targeted file that was not confirmed as deleted (or non-existing) will be marked as a "failed delete". Although these failed deletes will be added to `.clean` metadata, if incremental clean is used then these files might not ever be picked up again as a future clean plan, unless a "full-scan" clean ends up being scheduled. In addition to leading to more files unnecessarily taking up storage space for longer, then can lead to the following dataset consistency issue for COW datasets: # Insert at C1 creates file group f1 in partition # Replacecommit at RC2 creates file group f2 in partition, and replaces f1 # Any reader of partition that calls HUDI API (with or without using MDT) will recognize that f1 should be ignored, as it has been replaced. This is since RC2 instant file is in active timeline # Some completed instants later an incremental clean is scheduled. It moves the "earliest commit to retain" to an time after instant time RC2, so it targets f1 for deletion. But during execution of the plan, it fails to delete f1. # An archive job eventually is triggered, and archives C1 and RC2. Note that f1 is still in partition At this point, any job/query that reads the aforementioned partition directly from the DFS file system calls (without directly using MDT FILES partition) will consider both f1 and f2 as valid file groups, since RC2 is no longer in active timeline. This is a data consistency issue, and will only be resolved if a "full-scan" clean is triggered and deletes f1. This specific scenario can be avoided if the user can configure HUDI clean to fail execution of a clean plan unless all files are confirmed as deleted (or not existing in DFS already), "blocking" the clean. The next clean attempt will re-execute this existing plan, since clean plans cannot be "rolled back". was: When a HUDI clean plan is executed, any targeted file that was not confirmed as deleted (or non-existing) will be marked as a "failed delete". Although these failed deletes will be added to `.clean` metadata, if incremental clean is used then these files might not ever be picked up again as a future clean plan, unless a "full-scan" clean ends up being scheduled. In addition to leading to more files unnecessarily taking up storage space for longer, then can lead to the following dataset consistency issue for COW datasets: # Insert at C1 creates file group f1 in partition # Replacecommit at RC2 creates file group f2 in partition, and replaces f1 # Any reader of partition that calls HUDI API (with or without using MDT) will recognize that f1 should be ignored, as it has been replaced. This is since RC2 instant file is in active timeline # Some completed instants later an incremental clean is scheduled. It moves the "earliest commit to retain" to an time after instant time RC2, so it targets f1 for deletion. But during execution of the plan, it fails to delete f1. # An archive job eventually is triggered, and archives C1. Note that f1 is still in partition At this point, any job/query that reads the aforementioned partition directly from the DFS file system calls (without directly using MDT FILES partition) will consider both f1 and f2 as valid file groups, since RC2 is no longer in active timeline. This is a data consistency issue, and will only be resolved if a "full-scan" clean is triggered and deletes f1. This specific scenario can be avoided if the user can configure HUDI clean to fail execution of a clean plan unless all files are confirmed as deleted (or not existing in DFS already), "blocking" the clean. The next clean attempt will re-execute this existing plan, since clean plans cannot be "rolled back". > Support configuration for clean to fail execution if there is at least one > file is marked as a failed delete > > > Key: HUDI-7655 > URL: https://issues.apache.org/jira/browse/HUDI-7655 > Project: Apache Hudi > Issue Type: Improvement >Reporter: Krishen Bhan >Priority: Minor > Labels: clean > > When a HUDI clean plan is executed, any targeted file that was not confirmed > as deleted (or non-existing) will be marked as a "failed delete". Although > these failed deletes will be added to `.clean` metadata, if incremental clean > is used then these files might not ever be picked up again as a future clean > plan, unless a "full-scan" clean ends up being scheduled. In addition to > leading to more files unnecessarily taking up storage space for longer, then > can lead to the following dataset consistency issue for COW
[jira] [Created] (HUDI-7655) Support configuration for clean to fail execution if there is at least one file is marked as a failed delete
Krishen Bhan created HUDI-7655: -- Summary: Support configuration for clean to fail execution if there is at least one file is marked as a failed delete Key: HUDI-7655 URL: https://issues.apache.org/jira/browse/HUDI-7655 Project: Apache Hudi Issue Type: Improvement Reporter: Krishen Bhan When a HUDI clean plan is executed, any targeted file that was not confirmed as deleted (or non-existing) will be marked as a "failed delete". Although these failed deletes will be added to `.clean` metadata, if incremental clean is used then these files might not ever be picked up again as a future clean plan, unless a "full-scan" clean ends up being scheduled. In addition to leading to more files unnecessarily taking up storage space for longer, then can lead to the following dataset consistency issue for COW datasets: # Insert at C1 creates file group f1 in partition # Replacecommit at RC2 creates file group f2 in partition, and replaces f1 # Any reader of partition that calls HUDI API (with or without using MDT) will recognize that f1 should be ignored, as it has been replaced. This is since RC2 instant file is in active timeline # Some completed instants later an incremental clean is scheduled. It moves the "earliest commit to retain" to an time after instant time RC2, so it targets f1 for deletion. But during execution of the plan, it fails to delete f1. # An archive job eventually is triggered, and archives C1. Note that f1 is still in partition At this point, any job/query that reads the aforementioned partition directly from the DFS file system calls (without directly using MDT FILES partition) will consider both f1 and f2 as valid file groups, since RC2 is no longer in active timeline. This is a data consistency issue, and will only be resolved if a "full-scan" clean is triggered and deletes f1. This specific scenario can be avoided if the user can configure HUDI clean to fail execution of a clean plan unless all files are confirmed as deleted (or not existing in DFS already), "blocking" the clean. The next clean attempt will re-execute this existing plan, since clean plans cannot be "rolled back". -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (HUDI-7507) ongoing concurrent writers with smaller timestamp can cause issues with table services
[ https://issues.apache.org/jira/browse/HUDI-7507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Krishen Bhan updated HUDI-7507: --- Description: *Scenarios:* Although HUDI operations hold a table lock when creating a .requested instant, because HUDI writers do not generate a timestamp and create a .requsted plan in the same transaction, there can be a scenario where # Job 1 starts, chooses timestamp (x) , Job 2 starts and chooses timestamp (x - 1) # Job 1 schedules and creates requested file with instant timestamp (x) # Job 2 schedules and creates requested file with instant timestamp (x-1) # Both jobs continue running If one job is writing a commit and the other is a table service, this can cause issues: * ** If Job 2 is ingestion commit and Job 1 is compaction/log compaction, then when Job 1 runs before Job 2 and can create a compaction plan for all instant times (up to (x) ) that doesn’t include instant time (x-1) . Later Job 2 will create instant time (x-1), but timeline will be in a corrupted state since compaction plan was supposed to include (x-1) ** There is a similar issue with clean. If Job2 is a long-running commit (that was stuck/delayed for a while before creating its .requested plan) and Job 1 is a clean, then Job 1 can perform a clean that updates the earliest-commit-to-retain without waiting for the inflight instant by Job 2 at (x-1) to complete. This causes Job2 to be "skipped" by clean. [Edit] I added a diagram to visualize the issue, specifically the second scenario with clean !Flowchart (1).png! *Proposed approach:* One way this can be resolved is by combining the operations of generating instant time and creating a requested file in the same HUDI table transaction. Specifically, executing the following steps whenever any instant (commit, table service, etc) is scheduled Approach A # Acquire table lock # Look at the latest instant C on the active timeline (completed or not). Generate a timestamp after C # Create the plan and requested file using this new timestamp ( that is greater than C) # Release table lock Unfortunately (A) has the following drawbacks * Every operation must now hold the table lock when computing its plan even if it's an expensive operation and will take a while * Users of HUDI cannot easily set their own instant time of an operation, and this restriction would break any public APIs that allow this and would require deprecating those APIs. An alternate approach is to have every operation abort creating a .requested file unless it has the latest timestamp. Specifically, for any instant type, whenever an operation is about to create a .requested plan on timeline, it should take the table lock and assert that there are no other instants on timeline that are greater than it that could cause a conflict. If that assertion fails, then throw a retry-able conflict resolution exception. Specifically, the following steps should be followed whenever any instant (commit, table service, etc) is scheduled Approach B # Acquire table lock. Assume that the desired instant time C and requested file plan metadata have already been created, regardless of wether it was before this step or right after acquiring the table lock. # Get the set of all instants on the timeline that are greater than C (regardless of their action or sate status). ## If the current operation is an ingestion type (commit/deltacommit/insert_overwrite replace) then assert the set is empty ## If the current operation is a table service then assert that the set doesn't contain any table service instant types # Create requested plan on timeline (As usual) # Release table Unlike (A), this approach (B) allows users to continue to use HUDI APIs where caller can specify instant time (preventing the need from deprecating any public API). It also allows the possibility of table service operations computing their plan without holding a lock. Despite this though, (B) has following drawbacks * It is not immediately clear how MDT vs base table operations should be handled here. At first glance it seems that at step (2) both the base table and MDT timeline should be checked, but that might need more investigation to confirm. * This error will still be thrown even for combinations of concurrent operations where it would be safe to continue. For example, assume two ingestion writers being executing on a dataset, with each only performing a insert commit on the dataset (with no table service being scheduled). If the writer that started scheduling later ending up having an earlier timestamp, it would still be safe for it to continue. Despite that, because of step (2.1) it would still have to abort an throw an error. This means that on datasets with many frequent concurrent ingestion commits and very infrequent table service operations, there would be a lot of transient failures/noise
[jira] [Updated] (HUDI-7507) ongoing concurrent writers with smaller timestamp can cause issues with table services
[ https://issues.apache.org/jira/browse/HUDI-7507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Krishen Bhan updated HUDI-7507: --- Description: *Scenarios:* Although HUDI operations hold a table lock when creating a .requested instant, because HUDI writers do not generate a timestamp and create a .requsted plan in the same transaction, there can be a scenario where # Job 1 starts, chooses timestamp (x) , Job 2 starts and chooses timestamp (x - 1) # Job 1 schedules and creates requested file with instant timestamp (x) # Job 2 schedules and creates requested file with instant timestamp (x-1) # Both jobs continue running If one job is writing a commit and the other is a table service, this can cause issues: * ** If Job 2 is ingestion commit and Job 1 is compaction/log compaction, then when Job 1 runs before Job 2 and can create a compaction plan for all instant times (up to (x) ) that doesn’t include instant time (x-1) . Later Job 2 will create instant time (x-1), but timeline will be in a corrupted state since compaction plan was supposed to include (x-1) ** There is a similar issue with clean. If Job2 is a long-running commit (that was stuck/delayed for a while before creating its .requested plan) and Job 1 is a clean, then Job 1 can perform a clean that updates the earliest-commit-to-retain without waiting for the inflight instant by Job 2 at (x-1) to complete. This causes Job2 to be "skipped" by clean. [Edit] I added a diagram to visualize the issue, specifically the second scenario with clean !Flowchart (1).png! *Proposed approach:* One way this can be resolved is by combining the operations of generating instant time and creating a requested file in the same HUDI table transaction. Specifically, executing the following steps whenever any instant (commit, table service, etc) is scheduled Approach A # Acquire table lock # Look at the latest instant C on the active timeline (completed or not). Generate a timestamp after C # Create the plan and requested file using this new timestamp ( that is greater than C) # Release table lock Unfortunately (A) has the following drawbacks * Every operation must now hold the table lock when computing its plan even if it's an expensive operation and will take a while * Users of HUDI cannot easily set their own instant time of an operation, and this restriction would break any public APIs that allow this and would require deprecating those APIs. An alternate approach is to have every operation abort creating a .requested file unless it has the latest timestamp. Specifically, for any instant type, whenever an operation is about to create a .requested plan on timeline, it should take the table lock and assert that there are no other instants on timeline (inflight or otherwise) that are greater than it. If that assertion fails, then throw a retry-able conflict resolution exception. Specifically, the following steps should be followed whenever any instant (commit, table service, etc) is scheduled Approach B # Acquire table lock. Assume that the desired instant time C and requested file plan metadata have already been created, regardless of wether it was before this step or right after acquiring the table lock. # Check if there are any instant files on timeline greater than C (regardless of their action or sate status). If so raise a custom exception # Create requested plan on timeline (As usual) # Release table Unlike (A), this approach (B) allows users to continue to use HUDI APIs where caller can specify instant time (preventing the need from deprecating any public API). It also allows the possibility of table service operations computing their plan without holding a lock. Despite this though, (B) has following drawbacks * It is not immediately clear how MDT vs base table operations should be handled here. At first glance it seems that at step (2) both the base table and MDT timeline should be checked, but that might need more investigation to confirm. * This error will still be thrown even for combinations of concurrent operations where it would be safe to continue. For example, assume two ingestion writers being executing on a dataset, with each only performing a insert commit on the dataset (with no table service being scheduled). If the writer that started scheduling later ending up having an earlier timestamp, it would still be safe for it to continue. Despite that, because of step (2) it would still have to abort an throw an error. This means that on datasets with many frequent concurrent ingestion commits and very infrequent table service operations, there would be a lot of transient failures/noise by failing writers. This step (2) could potentially be revised to avoid this scenario (by only checking for certain actions like table services) but that would add complexity and it is not clear at first glance if that would
[jira] [Updated] (HUDI-7507) ongoing concurrent writers with smaller timestamp can cause issues with table services
[ https://issues.apache.org/jira/browse/HUDI-7507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Krishen Bhan updated HUDI-7507: --- Description: *Scenarios:* Although HUDI operations hold a table lock when creating a .requested instant, because HUDI writers do not generate a timestamp and create a .requsted plan in the same transaction, there can be a scenario where # Job 1 starts, chooses timestamp (x) , Job 2 starts and chooses timestamp (x - 1) # Job 1 schedules and creates requested file with instant timestamp (x) # Job 2 schedules and creates requested file with instant timestamp (x-1) # Both jobs continue running If one job is writing a commit and the other is a table service, this can cause issues: * ** If Job 2 is ingestion commit and Job 1 is compaction/log compaction, then when Job 1 runs before Job 2 and can create a compaction plan for all instant times (up to (x) ) that doesn’t include instant time (x-1) . Later Job 2 will create instant time (x-1), but timeline will be in a corrupted state since compaction plan was supposed to include (x-1) ** There is a similar issue with clean. If Job2 is a long-running commit (that was stuck/delayed for a while before creating its .requested plan) and Job 1 is a clean, then Job 1 can perform a clean that updates the earliest-commit-to-retain without waiting for the inflight instant by Job 2 at (x-1) to complete. This causes Job2 to be "skipped" by clean. [Edit] I added a diagram to visualize the issue, specifically the second scenario with clean !Flowchart (1).png! *Proposed approach:* One way this can be resolved is by combining the operations of generating instant time and creating a requested file in the same HUDI table transaction. Specifically, executing the following steps whenever any instant (commit, table service, etc) is scheduled Approach A # Acquire table lock # Look at the latest instant C on the active timeline (completed or not). Generate a timestamp after C # Create the plan and requested file using this new timestamp ( that is greater than C) # Release table lock Unfortunately (A) has the following drawbacks * Every operation must now hold the table lock when computing its plan even if it's an expensive operation and will take a while * Users of HUDI cannot easily set their own instant time of an operation, and this restriction would break any public APIs that allow this and would require deprecating those APIs. An alternate approach is to have every operation abort creating a .requested file unless it has the latest timestamp. Specifically, for any instant type, whenever an operation is about to create a .requested plan on timeline, it should take the table lock and assert that there are no other instants on timeline (inflight or otherwise) that are greater than it. If that assertion fails, then throw a retry-able conflict resolution exception. Specifically, the following steps should be followed whenever any instant (commit, table service, etc) is scheduled Approach B # Acquire table lock. Assume that the desired instant time C and requested file plan metadata have already been created, regardless of wether it was before this step or right after acquiring the table lock. # Check if there are any instant files on timeline greater than C (regardless of their action or sate status). If so raise a custom exception # Create requested plan on timeline (As usual) # Release table Unlike (A), this approach (B) allows users to continue to use HUDI APIs where caller can specify instant time (preventing the need from deprecating any public API). It also allows the possibility of table service operations computing their plan without holding a lock. Despite this though, (B) has following drawbacks * It is not immediately clear how MDT vs base table operations should be handled here. At first glance it seems that at step (2) both the base table and MDT timeline should be checked, but that might need more investigation to confirm. * This error will still be thrown even for combinations of concurrent operations where it would be safe to continue. For example, assume two ingestion writers being executing on a dataset, with each only performing a insert commit on the dataset (with no table service being scheduled). If the writer that started scheduling later ending up having an earlier timestamp, it would still be safe for it to continue. Despite that, because of step (2) it would still have to abort an throw an error. This means that on datasets with many frequent concurrent ingestion commits and very infrequent table service operations, there would be a lot of transient failures/noise by failing writers. This step (2) could potentially be revised to avoid this scenario (by only checking for certain actions like table services) but that would add complexity and it is not clear at first glance if that would
[jira] [Updated] (HUDI-7503) concurrent executions of table service plan should not corrupt dataset
[ https://issues.apache.org/jira/browse/HUDI-7503?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Krishen Bhan updated HUDI-7503: --- Summary: concurrent executions of table service plan should not corrupt dataset (was: concurrent executions of compaction plan should not corrupt dataset) > concurrent executions of table service plan should not corrupt dataset > -- > > Key: HUDI-7503 > URL: https://issues.apache.org/jira/browse/HUDI-7503 > Project: Apache Hudi > Issue Type: Improvement > Components: compaction, table-service >Reporter: Krishen Bhan >Priority: Minor > > Currently it is not safe for 2+ writers to concurrently call > `org.apache.hudi.client.BaseHoodieTableServiceClient#compact` on the same > compaction instant. This is since one writer might execute the instant and > create an inflight, while the other writer sees the inflight and tries to > roll it back before re-attempting to execute it (since it will assume said > inflight was a previously failed compaction attempt). > This logic should be updated such that only one writer will actually execute > the compaction plan at a time (and the others will fail/abort). > One approach is to use a transaction (base table lock) in conjunction with > heartbeating, to ensure that the writer triggers a heartbeat before executing > compaction, and any concurrent writers will use the heartbeat to check wether > the compaction is currently being executed by another writer. Specifically , > the compact API should execute the following steps > # Get the instant to compact C (as usual) > # Start a transaction > # Checks if C has an active heartbeat, if so finish transaction and throw > exception > # Start a heartbeat for C (this will implicitly re-start the heartbeat if it > has been started before by another job) > # Finish transaction > # Run the existing compact API logic on C > # If execution succeeds, clean up heartbeat file . If it fails do nothing > (as the heartbeat will anyway be automatically expired later). > Note that this approach only holds the table lock temporarily, when > checking/starting the heartbeat > Also, this flow can be applied to execution of clean plans and other table > services -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (HUDI-7507) ongoing concurrent writers with smaller timestamp can cause issues with table services
[ https://issues.apache.org/jira/browse/HUDI-7507?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17832306#comment-17832306 ] Krishen Bhan commented on HUDI-7507: I removed the original alternate approach I added in ticket description since it can still result in problematic edge cases. {code:java} An alternate approach (suggested by Prashant Wason ) was to instead have all operations including table services perform conflict resolution checks before committing. For example, clean and compaction would generate their plan as usual. But when creating a transaction to write a .requested file, right before creating the file they should check if another lower timestamp instant has appeared in the timeline. And if so, they should fail/abort without creating the plan. Commit operations would also be updated/verified to have similar check, before creating a .requested file (during a transaction) the commit operation will check if a table service plan (clean/compact) with a greater instant time has been created. And if so, would abort/fail. This avoids the drawbacks of the first approach, but will lead to more transient failures that users have to handle. {code} Specifically, it can still result in an inflight commit on base table failing while a completed compaction on MDT with a greater timestamp is present. > ongoing concurrent writers with smaller timestamp can cause issues with > table services > --- > > Key: HUDI-7507 > URL: https://issues.apache.org/jira/browse/HUDI-7507 > Project: Apache Hudi > Issue Type: Improvement > Components: table-service >Reporter: Krishen Bhan >Priority: Major > Attachments: Flowchart (1).png, Flowchart.png > > > Although HUDI operations hold a table lock when creating a .requested > instant, because HUDI writers do not generate a timestamp and create a > .requsted plan in the same transaction, there can be a scenario where > # Job 1 starts, chooses timestamp (x) , Job 2 starts and chooses timestamp > (x - 1) > # Job 1 schedules and creates requested file with instant timestamp (x) > # Job 2 schedules and creates requested file with instant timestamp (x-1) > # Both jobs continue running > If one job is writing a commit and the other is a table service, this can > cause issues: > * > ** If Job 2 is ingestion commit and Job 1 is compaction/log compaction, then > when Job 1 runs before Job 2 and can create a compaction plan for all instant > times (up to (x) ) that doesn’t include instant time (x-1) . Later Job 2 > will create instant time (x-1), but timeline will be in a corrupted state > since compaction plan was supposed to include (x-1) > ** There is a similar issue with clean. If Job2 is a long-running commit > (that was stuck/delayed for a while before creating its .requested plan) and > Job 1 is a clean, then Job 1 can perform a clean that updates the > earliest-commit-to-retain without waiting for the inflight instant by Job 2 > at (x-1) to complete. This causes Job2 to be "skipped" by clean. > [Edit] I added a diagram to visualize the issue, specifically the second > scenario with clean > !Flowchart (1).png! > > One way this can be resolved is by combining the operations of generating > instant time and creating a requested file in the same HUDI table > transaction. Specifically, executing the following steps whenever any instant > (commit, table service, etc) is scheduled > # Acquire table lock > # Look at the latest instant C on the active timeline (completed or not). > Generate a timestamp after C > # Create the plan and requested file using this new timestamp ( that is > greater than C) > # Release table lock > Unfortunately this has the following drawbacks > * Every operation must now hold the table lock when computing its plan, even > if its an expensive operation and will take a while > * Users of HUDI cannot easily set their own instant time of an operation, > and this restriction would break any public APIs that allow this > -An alternate approach (suggested by- [~pwason] -) was to instead have all > operations including table services perform conflict resolution checks before > committing. For example, clean and compaction would generate their plan as > usual. But when creating a transaction to write a .requested file, right > before creating the file they should check if another lower timestamp instant > has appeared in the timeline. And if so, they should fail/abort without > creating the plan. Commit operations would also be updated/verified to have > similar check, before creating a .requested file (during a transaction) the > commit operation will check if a table service plan (clean/compact) with a > greater instant time has been created. And if so,
[jira] [Updated] (HUDI-7507) ongoing concurrent writers with smaller timestamp can cause issues with table services
[ https://issues.apache.org/jira/browse/HUDI-7507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Krishen Bhan updated HUDI-7507: --- Description: Although HUDI operations hold a table lock when creating a .requested instant, because HUDI writers do not generate a timestamp and create a .requsted plan in the same transaction, there can be a scenario where # Job 1 starts, chooses timestamp (x) , Job 2 starts and chooses timestamp (x - 1) # Job 1 schedules and creates requested file with instant timestamp (x) # Job 2 schedules and creates requested file with instant timestamp (x-1) # Both jobs continue running If one job is writing a commit and the other is a table service, this can cause issues: * ** If Job 2 is ingestion commit and Job 1 is compaction/log compaction, then when Job 1 runs before Job 2 and can create a compaction plan for all instant times (up to (x) ) that doesn’t include instant time (x-1) . Later Job 2 will create instant time (x-1), but timeline will be in a corrupted state since compaction plan was supposed to include (x-1) ** There is a similar issue with clean. If Job2 is a long-running commit (that was stuck/delayed for a while before creating its .requested plan) and Job 1 is a clean, then Job 1 can perform a clean that updates the earliest-commit-to-retain without waiting for the inflight instant by Job 2 at (x-1) to complete. This causes Job2 to be "skipped" by clean. [Edit] I added a diagram to visualize the issue, specifically the second scenario with clean !Flowchart (1).png! One way this can be resolved is by combining the operations of generating instant time and creating a requested file in the same HUDI table transaction. Specifically, executing the following steps whenever any instant (commit, table service, etc) is scheduled # Acquire table lock # Look at the latest instant C on the active timeline (completed or not). Generate a timestamp after C # Create the plan and requested file using this new timestamp ( that is greater than C) # Release table lock Unfortunately this has the following drawbacks * Every operation must now hold the table lock when computing its plan, even if its an expensive operation and will take a while * Users of HUDI cannot easily set their own instant time of an operation, and this restriction would break any public APIs that allow this -An alternate approach (suggested by- [~pwason] -) was to instead have all operations including table services perform conflict resolution checks before committing. For example, clean and compaction would generate their plan as usual. But when creating a transaction to write a .requested file, right before creating the file they should check if another lower timestamp instant has appeared in the timeline. And if so, they should fail/abort without creating the plan. Commit operations would also be updated/verified to have similar check, before creating a .requested file (during a transaction) the commit operation will check if a table service plan (clean/compact) with a greater instant time has been created. And if so, would abort/fail. This avoids the drawbacks of the first approach, but will lead to more transient failures that users have to handle.- An alternate approach is to have every operation abort creating a .requested file unless it has the latest timestamp. Specifically, for any instant type, whenever an operation is about to create a .requested plan on timeline, it should take the table lock and assert that there are no other instants on timeline (inflight or otherwise) that are greater than it. If that assertion fails, then throw a retry-able conflict resolution exception. was: Although HUDI operations hold a table lock when creating a .requested instant, because HUDI writers do not generate a timestamp and create a .requsted plan in the same transaction, there can be a scenario where # Job 1 starts, chooses timestamp (x) , Job 2 starts and chooses timestamp (x - 1) # Job 1 schedules and creates requested file with instant timestamp (x) # Job 2 schedules and creates requested file with instant timestamp (x-1) # Both jobs continue running If one job is writing a commit and the other is a table service, this can cause issues: * ** If Job 2 is ingestion commit and Job 1 is compaction/log compaction, then when Job 1 runs before Job 2 and can create a compaction plan for all instant times (up to (x) ) that doesn’t include instant time (x-1) . Later Job 2 will create instant time (x-1), but timeline will be in a corrupted state since compaction plan was supposed to include (x-1) ** There is a similar issue with clean. If Job2 is a long-running commit (that was stuck/delayed for a while before creating its .requested plan) and Job 1 is a clean, then Job 1 can perform a clean that updates the earliest-commit-to-retain without waiting for the inflight instant
[jira] [Updated] (HUDI-7507) ongoing concurrent writers with smaller timestamp can cause issues with table services
[ https://issues.apache.org/jira/browse/HUDI-7507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Krishen Bhan updated HUDI-7507: --- Description: Although HUDI operations hold a table lock when creating a .requested instant, because HUDI writers do not generate a timestamp and create a .requsted plan in the same transaction, there can be a scenario where # Job 1 starts, chooses timestamp (x) , Job 2 starts and chooses timestamp (x - 1) # Job 1 schedules and creates requested file with instant timestamp (x) # Job 2 schedules and creates requested file with instant timestamp (x-1) # Both jobs continue running If one job is writing a commit and the other is a table service, this can cause issues: * ** If Job 2 is ingestion commit and Job 1 is compaction/log compaction, then when Job 1 runs before Job 2 and can create a compaction plan for all instant times (up to (x) ) that doesn’t include instant time (x-1) . Later Job 2 will create instant time (x-1), but timeline will be in a corrupted state since compaction plan was supposed to include (x-1) ** There is a similar issue with clean. If Job2 is a long-running commit (that was stuck/delayed for a while before creating its .requested plan) and Job 1 is a clean, then Job 1 can perform a clean that updates the earliest-commit-to-retain without waiting for the inflight instant by Job 2 at (x-1) to complete. This causes Job2 to be "skipped" by clean. [Edit] I added a diagram to visualize the issue, specifically the second scenario with clean !Flowchart (1).png! One way this can be resolved is by combining the operations of generating instant time and creating a requested file in the same HUDI table transaction. Specifically, executing the following steps whenever any instant (commit, table service, etc) is scheduled # Acquire table lock # Look at the latest instant C on the active timeline (completed or not). Generate a timestamp after C # Create the plan and requested file using this new timestamp ( that is greater than C) # Release table lock Unfortunately this has the following drawbacks * Every operation must now hold the table lock when computing its plan, even if its an expensive operation and will take a while * Users of HUDI cannot easily set their own instant time of an operation, and this restriction would break any public APIs that allow this An alternate approach (suggested by [~pwason] ) was to instead have all operations including table services perform conflict resolution checks before committing. For example, clean and compaction would generate their plan as usual. But when creating a transaction to write a .requested file, right before creating the file they should check if another lower timestamp instant has appeared in the timeline. And if so, they should fail/abort without creating the plan. Commit operations would also be updated/verified to have similar check, before creating a .requested file (during a transaction) the commit operation will check if a table service plan (clean/compact) with a greater instant time has been created. And if so, would abort/fail. This avoids the drawbacks of the first approach, but will lead to more transient failures that users have to handle. was: Although HUDI operations hold a table lock when creating a .requested instant, because HUDI writers do not generate a timestamp and create a .requsted plan in the same transaction, there can be a scenario where # Job 1 starts, chooses timestamp (x) , Job 2 starts and chooses timestamp (x - 1) # Job 1 schedules and creates requested file with instant timestamp (x) # Job 2 schedules and creates requested file with instant timestamp (x-1) # Both jobs continue running If one job is writing a commit and the other is a table service, this can cause issues: * ** If Job 2 is ingestion commit and Job 1 is compaction/log compaction, then when Job 1 runs before Job 2 and can create a compaction plan for all instant times (up to (x) ) that doesn’t include instant time (x-1) . Later Job 2 will create instant time (x-1), but timeline will be in a corrupted state since compaction plan was supposed to include (x-1) ** There is a similar issue with clean. If Job2 is a long-running commit (that was stuck/delayed for a while before creating its .requested plan) and Job 1 is a clean, then Job 1 can perform a clean that updates the earliest-commit-to-retain without waiting for the inflight instant by Job 2 at (x-1) to complete. This causes Job2 to be "skipped" by clean. [Edit] I added a diagram to visualize the issue, specifically the second scenario with clean !Flowchart.png! One way this can be resolved is by combining the operations of generating instant time and creating a requested file in the same HUDI table transaction. Specifically, executing the following steps whenever any instant (commit, table service, etc) is scheduled
[jira] [Updated] (HUDI-7507) ongoing concurrent writers with smaller timestamp can cause issues with table services
[ https://issues.apache.org/jira/browse/HUDI-7507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Krishen Bhan updated HUDI-7507: --- Attachment: Flowchart (1).png > ongoing concurrent writers with smaller timestamp can cause issues with > table services > --- > > Key: HUDI-7507 > URL: https://issues.apache.org/jira/browse/HUDI-7507 > Project: Apache Hudi > Issue Type: Improvement >Reporter: Krishen Bhan >Priority: Major > Attachments: Flowchart (1).png, Flowchart.png > > > Although HUDI operations hold a table lock when creating a .requested > instant, because HUDI writers do not generate a timestamp and create a > .requsted plan in the same transaction, there can be a scenario where > # Job 1 starts, chooses timestamp (x) , Job 2 starts and chooses timestamp > (x - 1) > # Job 1 schedules and creates requested file with instant timestamp (x) > # Job 2 schedules and creates requested file with instant timestamp (x-1) > # Both jobs continue running > If one job is writing a commit and the other is a table service, this can > cause issues: > * > ** If Job 2 is ingestion commit and Job 1 is compaction/log compaction, then > when Job 1 runs before Job 2 and can create a compaction plan for all instant > times (up to (x) ) that doesn’t include instant time (x-1) . Later Job 2 > will create instant time (x-1), but timeline will be in a corrupted state > since compaction plan was supposed to include (x-1) > ** There is a similar issue with clean. If Job2 is a long-running commit > (that was stuck/delayed for a while before creating its .requested plan) and > Job 1 is a clean, then Job 1 can perform a clean that updates the > earliest-commit-to-retain without waiting for the inflight instant by Job 2 > at (x-1) to complete. This causes Job2 to be "skipped" by clean. > [Edit] I added a diagram to visualize the issue, specifically the second > scenario with clean > !Flowchart.png! > > One way this can be resolved is by combining the operations of generating > instant time and creating a requested file in the same HUDI table > transaction. Specifically, executing the following steps whenever any instant > (commit, table service, etc) is scheduled > # Acquire table lock > # Look at the latest instant C on the active timeline (completed or not). > Generate a timestamp after C > # Create the plan and requested file using this new timestamp ( that is > greater than C) > # Release table lock > Unfortunately this has the following drawbacks > * Every operation must now hold the table lock when computing its plan, even > if its an expensive operation and will take a while > * Users of HUDI cannot easily set their own instant time of an operation, > and this restriction would break any public APIs that allow this > An alternate approach (suggested by [~pwason] ) was to instead have all > operations including table services perform conflict resolution checks before > committing. For example, clean and compaction would generate their plan as > usual. But when creating a transaction to write a .requested file, right > before creating the file they should check if another lower timestamp instant > has appeared in the timeline. And if so, they should fail/abort without > creating the plan. Commit operations would also be updated/verified to have > similar check, before creating a .requested file (during a transaction) the > commit operation will check if a table service plan (clean/compact) with a > greater instant time has been created. And if so, would abort/fail. This > avoids the drawbacks of the first approach, but will lead to more transient > failures that users have to handle. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (HUDI-7507) ongoing concurrent writers with smaller timestamp can cause issues with table services
[ https://issues.apache.org/jira/browse/HUDI-7507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Krishen Bhan updated HUDI-7507: --- Attachment: Flowchart.png Description: Although HUDI operations hold a table lock when creating a .requested instant, because HUDI writers do not generate a timestamp and create a .requsted plan in the same transaction, there can be a scenario where # Job 1 starts, chooses timestamp (x) , Job 2 starts and chooses timestamp (x - 1) # Job 1 schedules and creates requested file with instant timestamp (x) # Job 2 schedules and creates requested file with instant timestamp (x-1) # Both jobs continue running If one job is writing a commit and the other is a table service, this can cause issues: * ** If Job 2 is ingestion commit and Job 1 is compaction/log compaction, then when Job 1 runs before Job 2 and can create a compaction plan for all instant times (up to (x) ) that doesn’t include instant time (x-1) . Later Job 2 will create instant time (x-1), but timeline will be in a corrupted state since compaction plan was supposed to include (x-1) ** There is a similar issue with clean. If Job2 is a long-running commit (that was stuck/delayed for a while before creating its .requested plan) and Job 1 is a clean, then Job 1 can perform a clean that updates the earliest-commit-to-retain without waiting for the inflight instant by Job 2 at (x-1) to complete. This causes Job2 to be "skipped" by clean. [Edit] I added a diagram to visualize the issue, specifically the second scenario with clean !Flowchart.png! One way this can be resolved is by combining the operations of generating instant time and creating a requested file in the same HUDI table transaction. Specifically, executing the following steps whenever any instant (commit, table service, etc) is scheduled # Acquire table lock # Look at the latest instant C on the active timeline (completed or not). Generate a timestamp after C # Create the plan and requested file using this new timestamp ( that is greater than C) # Release table lock Unfortunately this has the following drawbacks * Every operation must now hold the table lock when computing its plan, even if its an expensive operation and will take a while * Users of HUDI cannot easily set their own instant time of an operation, and this restriction would break any public APIs that allow this An alternate approach (suggested by [~pwason] ) was to instead have all operations including table services perform conflict resolution checks before committing. For example, clean and compaction would generate their plan as usual. But when creating a transaction to write a .requested file, right before creating the file they should check if another lower timestamp instant has appeared in the timeline. And if so, they should fail/abort without creating the plan. Commit operations would also be updated/verified to have similar check, before creating a .requested file (during a transaction) the commit operation will check if a table service plan (clean/compact) with a greater instant time has been created. And if so, would abort/fail. This avoids the drawbacks of the first approach, but will lead to more transient failures that users have to handle. was: Although HUDI operations hold a table lock when creating a .requested instant, because HUDI writers do not generate a timestamp and create a .requsted plan in the same transaction, there can be a scenario where # Job 1 starts, chooses timestamp (x) , Job 2 starts and chooses timestamp (x - 1) # Job 1 schedules and creates requested file with instant timestamp (x) # Job 2 schedules and creates requested file with instant timestamp (x-1) # Both jobs continue running If one job is writing a commit and the other is a table service, this can cause issues: * ** If Job 2 is ingestion commit and Job 1 is compaction/log compaction, then when Job 1 runs before Job 2 and can create a compaction plan for all instant times (up to (x) ) that doesn’t include instant time (x-1) . Later Job 2 will create instant time (x-1), but timeline will be in a corrupted state since compaction plan was supposed to include (x-1) ** There is a similar issue with clean. If Job2 is a long-running commit (that was stuck/delayed for a while before creating its .requested plan) and Job 1 is a clean, then Job 1 can perform a clean that updates the earliest-commit-to-retain without waiting for the inflight instant by Job 2 at (x-1) to complete. This causes Job2 to be "skipped" by clean. One way this can be resolved is by combining the operations of generating instant time and creating a requested file in the same HUDI table transaction. Specifically, executing the following steps whenever any instant (commit, table service, etc) is scheduled # Acquire table lock # Look at the latest instant C on the active timeline (completed
[jira] [Created] (HUDI-7507) ongoing concurrent writers with smaller timestamp can cause issues with table services
Krishen Bhan created HUDI-7507: -- Summary: ongoing concurrent writers with smaller timestamp can cause issues with table services Key: HUDI-7507 URL: https://issues.apache.org/jira/browse/HUDI-7507 Project: Apache Hudi Issue Type: Improvement Reporter: Krishen Bhan Although HUDI operations hold a table lock when creating a .requested instant, because HUDI writers do not generate a timestamp and create a .requsted plan in the same transaction, there can be a scenario where # Job 1 starts, chooses timestamp (x) , Job 2 starts and chooses timestamp (x - 1) # Job 1 schedules and creates requested file with instant timestamp (x) # Job 2 schedules and creates requested file with instant timestamp (x-1) # Both jobs continue running If one job is writing a commit and the other is a table service, this can cause issues: * ** If Job 2 is ingestion commit and Job 1 is compaction/log compaction, then when Job 1 runs before Job 2 and can create a compaction plan for all instant times (up to (x) ) that doesn’t include instant time (x-1) . Later Job 2 will create instant time (x-1), but timeline will be in a corrupted state since compaction plan was supposed to include (x-1) ** There is a similar issue with clean. If Job2 is a long-running commit (that was stuck/delayed for a while before creating its .requested plan) and Job 1 is a clean, then Job 1 can perform a clean that updates the earliest-commit-to-retain without waiting for the inflight instant by Job 2 at (x-1) to complete. This causes Job2 to be "skipped" by clean. One way this can be resolved is by combining the operations of generating instant time and creating a requested file in the same HUDI table transaction. Specifically, executing the following steps whenever any instant (commit, table service, etc) is scheduled # Acquire table lock # Look at the latest instant C on the active timeline (completed or not). Generate a timestamp after C # Create the plan and requested file using this new timestamp ( that is greater than C) # Release table lock Unfortunately this has the following drawbacks * Every operation must now hold the table lock when computing its plan, even if its an expensive operation and will take a while * Users of HUDI cannot easily set their own instant time of an operation, and this restriction would break any public APIs that allow this An alternate approach (suggested by [~pwason] ) was to instead have all operations including table services perform conflict resolution checks before committing. For example, clean and compaction would generate their plan as usual. But when creating a transaction to write a .requested file, right before creating the file they should check if another lower timestamp instant has appeared in the timeline. And if so, they should fail/abort without creating the plan. Commit operations would also be updated/verified to have similar check, before creating a .requested file (during a transaction) the commit operation will check if a table service plan (clean/compact) with a greater instant time has been created. And if so, would abort/fail. This avoids the drawbacks of the first approach, but will lead to more transient failures that users have to handle. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (HUDI-7503) concurrent executions of compaction plan should not corrupt dataset
[ https://issues.apache.org/jira/browse/HUDI-7503?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Krishen Bhan updated HUDI-7503: --- Description: Currently it is not safe for 2+ writers to concurrently call `org.apache.hudi.client.BaseHoodieTableServiceClient#compact` on the same compaction instant. This is since one writer might execute the instant and create an inflight, while the other writer sees the inflight and tries to roll it back before re-attempting to execute it (since it will assume said inflight was a previously failed compaction attempt). This logic should be updated such that only one writer will actually execute the compaction plan at a time (and the others will fail/abort). One approach is to use a transaction (base table lock) in conjunction with heartbeating, to ensure that the writer triggers a heartbeat before executing compaction, and any concurrent writers will use the heartbeat to check wether the compaction is currently being executed by another writer. Specifically , the compact API should execute the following steps # Get the instant to compact C (as usual) # Start a transaction # Checks if C has an active heartbeat, if so finish transaction and throw exception # Start a heartbeat for C (this will implicitly re-start the heartbeat if it has been started before by another job) # Finish transaction # Run the existing compact API logic on C # If execution succeeds, clean up heartbeat file . If it fails do nothing (as the heartbeat will anyway be automatically expired later). Note that this approach only holds the table lock temporarily, when checking/starting the heartbeat Also, this flow can be applied to execution of clean plans and other table services was: Currently it is not safe for 2+ writers to concurrently call `org.apache.hudi.client.BaseHoodieTableServiceClient#compact` on the same compaction instant. This is since one writer might execute the instant and create an inflight, while the other writer sees the inflight and tries to roll it back before re-attempting to execute it (since it will assume said inflight was a previously failed compaction attempt). This logic should be updated such that only one writer will actually execute the compaction plan at a time (and the others will fail/abort). One approach is to use a transaction (base table lock) in conjunction with heartbeating, to ensure that the writer triggers a heartbeat before executing compaction, and any concurrent writers will use the heartbeat to check wether the compaction is currently being executed by another writer. Specifically , the compact API should execute the following steps # Get the instant to compact C (as usual) # Start a transaction # Checks if C has an active heartbeat, if so finish transaction and throw exception # Start a heartbeat for C (this will implicitly re-start the heartbeat if it has been started before by another job) # Finish transaction # Run the existing compact API logic on C # If execution succeeds, clean up heartbeat file . If it fails do nothing (as the heartbeat will anyway be automatically expired later). Note that this approach only holds the table lock temporarily, when checking/starting the heartbeat > concurrent executions of compaction plan should not corrupt dataset > --- > > Key: HUDI-7503 > URL: https://issues.apache.org/jira/browse/HUDI-7503 > Project: Apache Hudi > Issue Type: Improvement >Reporter: Krishen Bhan >Priority: Minor > > Currently it is not safe for 2+ writers to concurrently call > `org.apache.hudi.client.BaseHoodieTableServiceClient#compact` on the same > compaction instant. This is since one writer might execute the instant and > create an inflight, while the other writer sees the inflight and tries to > roll it back before re-attempting to execute it (since it will assume said > inflight was a previously failed compaction attempt). > This logic should be updated such that only one writer will actually execute > the compaction plan at a time (and the others will fail/abort). > One approach is to use a transaction (base table lock) in conjunction with > heartbeating, to ensure that the writer triggers a heartbeat before executing > compaction, and any concurrent writers will use the heartbeat to check wether > the compaction is currently being executed by another writer. Specifically , > the compact API should execute the following steps > # Get the instant to compact C (as usual) > # Start a transaction > # Checks if C has an active heartbeat, if so finish transaction and throw > exception > # Start a heartbeat for C (this will implicitly re-start the heartbeat if it > has been started before by another job) > # Finish transaction > # Run the existing compact API logic on C
[jira] [Created] (HUDI-7503) concurrent executions of compaction plan should not corrupt dataset
Krishen Bhan created HUDI-7503: -- Summary: concurrent executions of compaction plan should not corrupt dataset Key: HUDI-7503 URL: https://issues.apache.org/jira/browse/HUDI-7503 Project: Apache Hudi Issue Type: Improvement Reporter: Krishen Bhan Currently it is not safe for 2+ writers to concurrently call `org.apache.hudi.client.BaseHoodieTableServiceClient#compact` on the same compaction instant. This is since one writer might execute the instant and create an inflight, while the other writer sees the inflight and tries to roll it back before re-attempting to execute it (since it will assume said inflight was a previously failed compaction attempt). This logic should be updated such that only one writer will actually execute the compaction plan at a time (and the others will fail/abort). One approach is to use a transaction (base table lock) in conjunction with heartbeating, to ensure that the writer triggers a heartbeat before executing compaction, and any concurrent writers will use the heartbeat to check wether the compaction is currently being executed by another writer. Specifically , the compact API should execute the following steps # Get the instant to compact C (as usual) # Start a transaction # Checks if C has an active heartbeat, if so finish transaction and throw exception # Start a heartbeat for C (this will implicitly re-start the heartbeat if it has been started before by another job) # Finish transaction # Run the existing compact API logic on C # If execution succeeds, clean up heartbeat file . If it fails do nothing (as the heartbeat will anyway be automatically expired later). Note that this approach only holds the table lock temporarily, when checking/starting the heartbeat -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (HUDI-7337) Implement MetricsReporter for M3
[ https://issues.apache.org/jira/browse/HUDI-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Krishen Bhan updated HUDI-7337: --- Description: Add a new implementation of org.apache.hudi.metrics.MetricsReporter that reports metrics to [M3|https://m3db.io/]. This can be achieved by using the Java library [https://github.com/uber-java/tally/tree/master] (was: Add a new implementation of MetricsReport that reports metrics to [M3|https://m3db.io/]. This can be achieved by using the Java library https://github.com/uber-java/tally/tree/master) > Implement MetricsReporter for M3 > > > Key: HUDI-7337 > URL: https://issues.apache.org/jira/browse/HUDI-7337 > Project: Apache Hudi > Issue Type: Wish > Components: metrics >Reporter: Krishen Bhan >Priority: Trivial > > Add a new implementation of org.apache.hudi.metrics.MetricsReporter that > reports metrics to [M3|https://m3db.io/]. This can be achieved by using the > Java library [https://github.com/uber-java/tally/tree/master] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (HUDI-7337) Implement MetricsReporter for M3
Krishen Bhan created HUDI-7337: -- Summary: Implement MetricsReporter for M3 Key: HUDI-7337 URL: https://issues.apache.org/jira/browse/HUDI-7337 Project: Apache Hudi Issue Type: Wish Components: metrics Reporter: Krishen Bhan Add a new implementation of MetricsReport that reports metrics to [M3|https://m3db.io/]. This can be achieved by using the Java library https://github.com/uber-java/tally/tree/master -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (HUDI-7316) AbstractHoodieLogRecordReader should accept already-constructed HoodieTableMetaClient in order to reduce occurrences of file listing calls when reloading active timeline
Krishen Bhan created HUDI-7316: -- Summary: AbstractHoodieLogRecordReader should accept already-constructed HoodieTableMetaClient in order to reduce occurrences of file listing calls when reloading active timeline Key: HUDI-7316 URL: https://issues.apache.org/jira/browse/HUDI-7316 Project: Apache Hudi Issue Type: Improvement Reporter: Krishen Bhan Currently some implementors of AbstractHoodieLogRecordReader create a HoodieTableMetaClient on construction, which implicitly reloads active timeline, causing a {{listStatus}} HDFS call. Since when using Spark engine these are created in Spark executors, a Spark user may have hundreds to thousands of executors that will make a {{listStatus}} call at the same time (during a Spark stage). To avoid these redundant calls to the HDFS NameNode (or any distributed filesystem service in general), users of AbstractHoodieLogRecordReader and implementations should pass in already-constructed HoodieTableMetaClient. As long as the caller passed in a HoodieTableMetaClient with active timeline already loaded, and the implementation doesn't need to re-load the timeline (such as in order to get a more "fresh" timeline) then these calls will be avoided in the executor, without causing the logic to be incorrect. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (HUDI-7313) ZookeeperBasedLockProvider should store application info in lock node
Krishen Bhan created HUDI-7313: -- Summary: ZookeeperBasedLockProvider should store application info in lock node Key: HUDI-7313 URL: https://issues.apache.org/jira/browse/HUDI-7313 Project: Apache Hudi Issue Type: Improvement Components: metrics, multi-writer Reporter: Krishen Bhan Currently when ZookeeperBasedLockProvider acquires a lock, it does not provide information on the lock holder via [https://curator.apache.org/apidocs/org/apache/curator/framework/recipes/locks/InterProcessMutex.html#getLockNodeBytes|https://curator.apache.org/apidocs/org/apache/curator/framework/recipes/locks/InterProcessMutex.html#getLockNodeBytes--] which can be used to store info about the application that acquired the lock. Updating HUDI to implement this API would help users easily identify information about the application that acquired the ZooKeeper lock when they use ZooKeeper tooling (such as zkcli). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (HUDI-7308) LockManager::unlock should not call updateLockHeldTimerMetrics if lockDurationTimer has not been started
[ https://issues.apache.org/jira/browse/HUDI-7308?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Krishen Bhan updated HUDI-7308: --- Status: In Progress (was: Open) > LockManager::unlock should not call updateLockHeldTimerMetrics if > lockDurationTimer has not been started > > > Key: HUDI-7308 > URL: https://issues.apache.org/jira/browse/HUDI-7308 > Project: Apache Hudi > Issue Type: Bug > Components: metrics, multi-writer >Affects Versions: 1.0.0-beta1 >Reporter: Krishen Bhan >Priority: Trivial > > If an exception is thrown in > org.apache.hudi.client.transaction.lock.LockManager#lock it is possible for > lockDurationTimer in HoodieLockMetrics to be closed by the user before it is > started, which throws and bubbles up a `HoodieException("Timer was not > started")` exception (rather than the actual exception that occurred when > trying to acquire lock). Specifically, this can happen due to following > scenario > # The BaseHoodieTableServiceClient calls > `org.apache.hudi.client.BaseHoodieTableServiceClient#completeClustering` , > which in turn calls > `org.apache.hudi.client.transaction.TransactionManager#beginTransaction` > within a `try` block > # During > `org.apache.hudi.client.transaction.TransactionManager#beginTransaction` the > LockManager lock API > `org.apache.hudi.client.transaction.lock.LockManager#lock` is called > # Inside ``org.apache.hudi.client.transaction.lock.LockManager#lock` , the > `java.util.concurrent.locks.Lock#tryLock(long, > java.util.concurrent.TimeUnit)` throws some exception. Because of this > exception, the statement > `metrics.updateLockAcquiredMetric();` is not executed. This means that the > `org.apache.hudi.common.util.HoodieTimer#startTimer` method was never called > for the timer HoodieLockMetrics member variable > `org.apache.hudi.client.transaction.lock.metrics.HoodieLockMetrics#lockDurationTimer` > > # The exception in (3) bubbles up back to > `org.apache.hudi.client.BaseHoodieTableServiceClient#completeClustering`. > Since this is in a `try` block, the `catch` and `finally` blocks are > executed. When `finally` is executed though, > `org.apache.hudi.client.transaction.TransactionManager#endTransaction` is > called > # During > `org.apache.hudi.client.transaction.TransactionManager#endTransaction` the > LockManager unlock API > `org.apache.hudi.client.transaction.lock.LockManager#unlock` is called. > During the execution of `metrics.updateLockHeldTimerMetrics();` , The > method `org.apache.hudi.common.util.HoodieTimer#endTimer` is called for > `org.apache.hudi.client.transaction.lock.metrics.HoodieLockMetrics#lockDurationTimer` > . This throws an exception ` `HoodieException("Timer was not started")` This > is because the corresponding > `org.apache.hudi.common.util.HoodieTimer#startTimer` method was never called > The issue here is that the caller ('BaseHoodieTableServiceClient` in this > case) should have ended up re-throwing the exception thrown in (3) while > trying to start the transaction in > `org.apache.hudi.client.transaction.TransactionManager#startTransaction`. > Instead though, because the caller safely "cleaned up" by calling > `org.apache.hudi.client.transaction.TransactionManager#endTransaction` (in a > `finally`), the `HoodieException("Timer was not started")` exception was > raised instead, suppressing the exception from (3), which is the actual root > cause issue. Instead, the execution of > `org.apache.hudi.client.transaction.TransactionManager#endTransaction` should > have executed without throwing this additional exception, which would have > lead the caller to throw the exception in (3) before exiting. > Although resolving this would not prevent the overall operation from failing, > it would provide better observability on the actual root cause exception (the > one from (3)). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (HUDI-7308) LockManager::unlock should not call updateLockHeldTimerMetrics if lockDurationTimer has not been started
Krishen Bhan created HUDI-7308: -- Summary: LockManager::unlock should not call updateLockHeldTimerMetrics if lockDurationTimer has not been started Key: HUDI-7308 URL: https://issues.apache.org/jira/browse/HUDI-7308 Project: Apache Hudi Issue Type: Bug Components: metrics, multi-writer Affects Versions: 1.0.0-beta1 Reporter: Krishen Bhan If an exception is thrown in org.apache.hudi.client.transaction.lock.LockManager#lock it is possible for lockDurationTimer in HoodieLockMetrics to be closed by the user before it is started, which throws and bubbles up a `HoodieException("Timer was not started")` exception (rather than the actual exception that occurred when trying to acquire lock). Specifically, this can happen due to following scenario # The BaseHoodieTableServiceClient calls `org.apache.hudi.client.BaseHoodieTableServiceClient#completeClustering` , which in turn calls `org.apache.hudi.client.transaction.TransactionManager#beginTransaction` within a `try` block # During `org.apache.hudi.client.transaction.TransactionManager#beginTransaction` the LockManager lock API `org.apache.hudi.client.transaction.lock.LockManager#lock` is called # Inside ``org.apache.hudi.client.transaction.lock.LockManager#lock` , the `java.util.concurrent.locks.Lock#tryLock(long, java.util.concurrent.TimeUnit)` throws some exception. Because of this exception, the statement `metrics.updateLockAcquiredMetric();` is not executed. This means that the `org.apache.hudi.common.util.HoodieTimer#startTimer` method was never called for the timer HoodieLockMetrics member variable `org.apache.hudi.client.transaction.lock.metrics.HoodieLockMetrics#lockDurationTimer` # The exception in (3) bubbles up back to `org.apache.hudi.client.BaseHoodieTableServiceClient#completeClustering`. Since this is in a `try` block, the `catch` and `finally` blocks are executed. When `finally` is executed though, `org.apache.hudi.client.transaction.TransactionManager#endTransaction` is called # During `org.apache.hudi.client.transaction.TransactionManager#endTransaction` the LockManager unlock API `org.apache.hudi.client.transaction.lock.LockManager#unlock` is called. During the execution of `metrics.updateLockHeldTimerMetrics();` , The method `org.apache.hudi.common.util.HoodieTimer#endTimer` is called for `org.apache.hudi.client.transaction.lock.metrics.HoodieLockMetrics#lockDurationTimer` . This throws an exception ` `HoodieException("Timer was not started")` This is because the corresponding `org.apache.hudi.common.util.HoodieTimer#startTimer` method was never called The issue here is that the caller ('BaseHoodieTableServiceClient` in this case) should have ended up re-throwing the exception thrown in (3) while trying to start the transaction in `org.apache.hudi.client.transaction.TransactionManager#startTransaction`. Instead though, because the caller safely "cleaned up" by calling `org.apache.hudi.client.transaction.TransactionManager#endTransaction` (in a `finally`), the `HoodieException("Timer was not started")` exception was raised instead, suppressing the exception from (3), which is the actual root cause issue. Instead, the execution of `org.apache.hudi.client.transaction.TransactionManager#endTransaction` should have executed without throwing this additional exception, which would have lead the caller to throw the exception in (3) before exiting. Although resolving this would not prevent the overall operation from failing, it would provide better observability on the actual root cause exception (the one from (3)). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (HUDI-6596) Propose rollback implementation changes to guard against concurrent jobs
[ https://issues.apache.org/jira/browse/HUDI-6596?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751866#comment-17751866 ] Krishen Bhan edited comment on HUDI-6596 at 8/17/23 7:47 PM: - I was going to create my PR [https://github.com/kbuci/hudi/pull/2] for this change on the hudi repo, but realized there was an issue since the assumptions made in the rollback implementation (both the existing one and my proposed change) where {{org.apache.hudi.client.BaseHoodieTableServiceClient#rollback(org.apache.hudi.table.HoodieTable, java.lang.String, org.apache.hudi.common.util.Option, java.lang.String)}} is inconsistent with the changes here in [https://github.com/apache/hudi/pull/8849] Specifically, {{org.apache.hudi.client.BaseHoodieTableServiceClient#rollback(org.apache.hudi.table.HoodieTable, java.lang.String, org.apache.hudi.common.util.Option, java.lang.String)}} seems to have been implemented (base on code and comments) under the assumption that a rollback operation will delete all instant files from {{commit instant to rollback}} before completing the rollback operation itself, which is what I had thought when I was working on my rollback fix(es). But it seems that after [https://github.com/apache/hudi/pull/8849] this is (retroactively) incorrect as now we are deleting instant files from {{commit instant to rollback}} after completing the rollback instant, leaving rollback operation as a special type of case where it is possible for rollback instant to be complete even if the actual rollback operation has not fully completed (due to failing after completing the rollback instant but before cleaning up instant files of `{{{}commit instant to rollback{}}} ). Although [https://github.com/apache/hudi/pull/8849] handles this by delegating the deleting of instant files from `{{{}commit instant to rollback{}}} to some clean rollbackFailedWrites operation, I think the intention/invariants/rules of how rollback operates is a bit ambiguous to me and something that should be reconciled. To further add complexity, it seems that based on [https://github.com/kbuci/hudi/blob/35be9bbbc7ef7ae6ad0a4955da78da4c0463074f/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java#L630] it is also currently legal to remove a request rollback plan, in other words "rolling back" a pending rollback plan. Also after taking another look at the reason for [https://github.com/apache/hudi/pull/8849] I think the fix there can be reverted and handled alternatively, since it seems to me that fixing/finding bugs with getPendingRollbackInfo and preventing concurrent rollback scheduling/execution might prevent the underlying issue/reason for PR in the first place was (Author: JIRAUSER301521): I was going to create my PR [https://github.com/kbuci/hudi/pull/2] for this change on the hudi repo, but realized there was an issue since the assumptions made in the rollback implementation (both the existing one and my proposed change) where {{org.apache.hudi.client.BaseHoodieTableServiceClient#rollback(org.apache.hudi.table.HoodieTable, java.lang.String, org.apache.hudi.common.util.Option, java.lang.String)}} is inconsistent with the changes here in [https://github.com/apache/hudi/pull/8849] Specifically, {{org.apache.hudi.client.BaseHoodieTableServiceClient#rollback(org.apache.hudi.table.HoodieTable, java.lang.String, org.apache.hudi.common.util.Option, java.lang.String)}} seems to have been implemented (base on code and comments) under the assumption that a rollback operation will delete all instant files from {{commit instant to rollback}} before completing the rollback operation itself, which is what I had thought when I was working on my rollback fix(es). But it seems that after [https://github.com/apache/hudi/pull/8849] this is (retroactively) incorrect as now we are deleting instant files from {{commit instant to rollback}} after completing the rollback instant, leaving rollback operation as a special type of case where it is possible for rollback instant to be complete even if the actual rollback operation has not fully completed (due to failing after completing the rollback instant but before cleaning up instant files of `{{{}commit instant to rollback{}}} ). Although [https://github.com/apache/hudi/pull/8849] handles this by delegating the deleting of instant files from `{{{}commit instant to rollback{}}} to some clean rollbackFailedWrites operation, I think the intention/invariants/rules of how rollback operates is a bit ambiguous to me and something that should be reconciled. Also after taking another look at the reason for [https://github.com/apache/hudi/pull/8849] I think the fix there can be reverted and handled alternatively, since it seems to me that fixing/finding bugs with getPendingRollbackInfo and preventing concurrent
[jira] [Commented] (HUDI-6596) Propose rollback implementation changes to guard against concurrent jobs
[ https://issues.apache.org/jira/browse/HUDI-6596?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17755672#comment-17755672 ] Krishen Bhan commented on HUDI-6596: {quote} I think we should use a different name to skipLocking, if acquiring the lock is skipped because we already acquired the lock, then we should use a different variable something like isLockAcquired or something. {quote} I think that was the convention I noticed, but sure I can address that once I post the PR for review, thanks! {quote} Without complicating the rollback logic, let us see all the cases where we use rollback. 1. Rollback failed writes: Lock has to be acquired until scheduling the rollback plans for pending instantsToRollback and for execution it need not acquire a lock. 2. Rollback a specific instant: Only schedule step needs to be under a lock. 3. Restore operation: Entire operation needs to be under a lock. For rollbackFailedWrites method, break it down to two Stages Stage 1: Scheduling stage Step 1: Acquire lock and reload active timeline Step 2: getInstantsToRollback Step 3: removeInflightFilesAlreadyRolledBack Step 4: getPendingRollbackInfos Step 5: Use existing plan or schedule rollback Step 6: Release lock Stage 2: Execution stage Step 7: Check if heartbeat exist for pending rollback plan. If yes abort else start an heartbeat and proceed further for executing it. {quote} For now in this ticket the intention is to just focus on (2) `Rollback a specific instant:` . Depending on how this implementation goes, I think we could follow your approach for (1) `rollbackFailedWrites` when I create a ticket to address that. Sorry, I should rename the name of this JIRA ticket to clarify that. {quote} Rollback operation are not that common. We only do rollback if something fails. So, it is not like .clean or .commit operations. So, we should be ok in seeing some noise. {quote} The issue is that although the chance of an individual job transiently failing on a upsert is low, as we add more concurrent writers to our pool of upsert jobs on a dataset, the chance that at least one upsert job will fail increases. In addition there is the case of underlying infrastructure (like Spark/YARN) service degradations (that we've seen internally in our organization) it's possible that all writers might fail during an upsert/rollback in the same window of time. This means that we should try to gracefully/resiliently account for a chance that there is a concurrent rollback going on during a job's upsert operation, or even a concurrent rollback that itself has failed. Although locking the table during a rollback is out of the question, we can still go with an approach like I suggested in https://issues.apache.org/jira/browse/HUDI-6596?focusedCommentId=17751201=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17751201 , to greatly reduce the chance that some sporadic rollback/failures will cause all concurrent upsert jobs to fail. > Propose rollback implementation changes to guard against concurrent jobs > - > > Key: HUDI-6596 > URL: https://issues.apache.org/jira/browse/HUDI-6596 > Project: Apache Hudi > Issue Type: Wish >Reporter: Krishen Bhan >Priority: Trivial > > h1. Issue > The existing rollback API in 0.14 > [https://github.com/apache/hudi/blob/master/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java#L877] > executes a rollback plan, either taking in an existing rollback plan > provided by the caller for a previous rollback or attempt, or scheduling a > new rollback instant if none is provided. Currently it is not safe for two > concurrent jobs to call this API (when skipLocking=False and the callers > aren't already holding a lock), as this can lead to an issue where multiple > rollback requested plans are created or two jobs are executing the same > rollback instant at the same time. > h1. Proposed change > One way to resolve this issue is to refactor this rollback function such that > if skipLocking=false, the following steps are followed > # Acquire the table lock > # Reload the active timeline > # Look at the active timeline to see if there is a inflight rollback instant > from a previous rollback attempt, if it exists then assign this is as the > rollback plan to execute. Also, check if a pending rollback plan was passed > in by caller. Then it executes the following steps depending on whether the > caller passed a pending rollback instant plan. > ## [a] If a pending inflight rollback plan was passed in by caller, then > check that there is a previous attempted rollback instant on timeline (and > that the instant times match) and continue to use this rollback plan. If that > isn't the
[jira] [Comment Edited] (HUDI-6596) Propose rollback implementation changes to guard against concurrent jobs
[ https://issues.apache.org/jira/browse/HUDI-6596?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751866#comment-17751866 ] Krishen Bhan edited comment on HUDI-6596 at 8/8/23 11:48 PM: - I was going to create my PR [https://github.com/kbuci/hudi/pull/2] for this change on the hudi repo, but realized there was an issue since the assumptions made in the rollback implementation (both the existing one and my proposed change) where {{org.apache.hudi.client.BaseHoodieTableServiceClient#rollback(org.apache.hudi.table.HoodieTable, java.lang.String, org.apache.hudi.common.util.Option, java.lang.String)}} is inconsistent with the changes here in [https://github.com/apache/hudi/pull/8849] Specifically, {{org.apache.hudi.client.BaseHoodieTableServiceClient#rollback(org.apache.hudi.table.HoodieTable, java.lang.String, org.apache.hudi.common.util.Option, java.lang.String)}} seems to have been implemented (base on code and comments) under the assumption that a rollback operation will delete all instant files from {{commit instant to rollback}} before completing the rollback operation itself, which is what I had thought when I was working on my rollback fix(es). But it seems that after [https://github.com/apache/hudi/pull/8849] this is (retroactively) incorrect as now we are deleting instant files from {{commit instant to rollback}} after completing the rollback instant, leaving rollback operation as a special type of case where it is possible for rollback instant to be complete even if the actual rollback operation has not fully completed (due to failing after completing the rollback instant but before cleaning up instant files of `{{{}commit instant to rollback{}}} ). Although [https://github.com/apache/hudi/pull/8849] handles this by delegating the deleting of instant files from `{{{}commit instant to rollback{}}} to some clean rollbackFailedWrites operation, I think the intention/invariants/rules of how rollback operates is a bit ambiguous to me and something that should be reconciled. Also after taking another look at the reason for [https://github.com/apache/hudi/pull/8849] I think the fix there can be reverted and handled alternatively, since it seems to me that fixing/finding bugs with getPendingRollbackInfo and preventing concurrent rollback scheduling/execution might prevent the underlying issue/reason for PR in the first place was (Author: JIRAUSER301521): I was going to create my PR [https://github.com/kbuci/hudi/pull/2] for this change on the hudi repo, but realized there was an issue since the assumptions made in the rollback implementation (both the existing one and my proposed change) where {{org.apache.hudi.client.BaseHoodieTableServiceClient#rollback(org.apache.hudi.table.HoodieTable, java.lang.String, org.apache.hudi.common.util.Option, java.lang.String)}} is inconsistent with the changes here in [https://github.com/apache/hudi/pull/8849] Specifically, {{org.apache.hudi.client.BaseHoodieTableServiceClient#rollback(org.apache.hudi.table.HoodieTable, java.lang.String, org.apache.hudi.common.util.Option, java.lang.String)}} seems to have been implemented (base on code and comments) under the assumption that a rollback operation will delete all instant files from {{commit instant to rollback}} before completing the rollback operation itself, which is what I had thought when I was working on my rollback fix(es). But it seems that after [https://github.com/apache/hudi/pull/8849] this is (retroactively) incorrect as now we are deleting instant files from {{commit instant to rollback}} after completing the rollback instant, leaving rollback operation as a special type of case where it is possible for rollback instant to be complete even if the actual rollback operation has not fully completed (due to failing after completing the rollback instant but before cleaning up instant files of `{{{}commit instant to rollback{}}} ). Although [https://github.com/apache/hudi/pull/8849] handles this by delegating the deleting of instant files from `{{{}commit instant to rollback{}}} to some clean rollbackFailedWrites operation, I think the intention/invariants/rules of how rollback operates is a bit ambiguous to me and something that should be reconciled. > Propose rollback implementation changes to guard against concurrent jobs > - > > Key: HUDI-6596 > URL: https://issues.apache.org/jira/browse/HUDI-6596 > Project: Apache Hudi > Issue Type: Wish >Reporter: Krishen Bhan >Priority: Trivial > > h1. Issue > The existing rollback API in 0.14 > [https://github.com/apache/hudi/blob/master/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java#L877] > executes a rollback
[jira] [Commented] (HUDI-6596) Propose rollback implementation changes to guard against concurrent jobs
[ https://issues.apache.org/jira/browse/HUDI-6596?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751866#comment-17751866 ] Krishen Bhan commented on HUDI-6596: I was going to create my PR [https://github.com/kbuci/hudi/pull/2] for this change on the hudi repo, but realized there was an issue since the assumptions made in the rollback implementation (both the existing one and my proposed change) where {{org.apache.hudi.client.BaseHoodieTableServiceClient#rollback(org.apache.hudi.table.HoodieTable, java.lang.String, org.apache.hudi.common.util.Option, java.lang.String)}} is inconsistent with the changes here in [https://github.com/apache/hudi/pull/8849] Specifically, {{org.apache.hudi.client.BaseHoodieTableServiceClient#rollback(org.apache.hudi.table.HoodieTable, java.lang.String, org.apache.hudi.common.util.Option, java.lang.String)}} seems to have been implemented (base on code and comments) under the assumption that a rollback operation will delete all instant files from {{commit instant to rollback}} before completing the rollback operation itself, which is what I had thought when I was working on my rollback fix(es). But it seems that after [https://github.com/apache/hudi/pull/8849] this is (retroactively) incorrect as now we are deleting instant files from {{commit instant to rollback}} after completing the rollback instant, leaving rollback operation as a special type of case where it is possible for rollback instant to be complete even if the actual rollback operation has not fully completed (due to failing after completing the rollback instant but before cleaning up instant files of `{{{}commit instant to rollback{}}} ). Although [https://github.com/apache/hudi/pull/8849] handles this by delegating the deleting of instant files from `{{{}commit instant to rollback{}}} to some clean rollbackFailedWrites operation, I think the intention/invariants/rules of how rollback operates is a bit ambiguous to me and something that should be reconciled. > Propose rollback implementation changes to guard against concurrent jobs > - > > Key: HUDI-6596 > URL: https://issues.apache.org/jira/browse/HUDI-6596 > Project: Apache Hudi > Issue Type: Wish >Reporter: Krishen Bhan >Priority: Trivial > > h1. Issue > The existing rollback API in 0.14 > [https://github.com/apache/hudi/blob/master/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java#L877] > executes a rollback plan, either taking in an existing rollback plan > provided by the caller for a previous rollback or attempt, or scheduling a > new rollback instant if none is provided. Currently it is not safe for two > concurrent jobs to call this API (when skipLocking=False and the callers > aren't already holding a lock), as this can lead to an issue where multiple > rollback requested plans are created or two jobs are executing the same > rollback instant at the same time. > h1. Proposed change > One way to resolve this issue is to refactor this rollback function such that > if skipLocking=false, the following steps are followed > # Acquire the table lock > # Reload the active timeline > # Look at the active timeline to see if there is a inflight rollback instant > from a previous rollback attempt, if it exists then assign this is as the > rollback plan to execute. Also, check if a pending rollback plan was passed > in by caller. Then it executes the following steps depending on whether the > caller passed a pending rollback instant plan. > ## [a] If a pending inflight rollback plan was passed in by caller, then > check that there is a previous attempted rollback instant on timeline (and > that the instant times match) and continue to use this rollback plan. If that > isn't the case, then raise a rollback exception since this means another job > has concurrently already executed this plan. Note that in a valid HUDI > dataset there can be at most one rollback instant for a corresponding commit > instant, which is why if we no longer see a pending rollback in timeline in > this phase we can safely assume that it had already been executed to > completion. > ## [b] If no pending inflight rollback plan was passed in by caller and no > pending rollback instant was found in timeline earlier, then schedule a new > rollback plan > # Now that a rollback plan and requested rollback instant time has been > assigned, check for an active heartbeat for the rollback instant time. If > there is one, then abort the rollback as that means there is a concurrent job > executing that rollback. If not, then start a heartbeat for that rollback > instant time. > # Release the table lock > # Execute the rollback plan and complete the rollback instant.
[jira] [Commented] (HUDI-6596) Propose rollback implementation changes to guard against concurrent jobs
[ https://issues.apache.org/jira/browse/HUDI-6596?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751201#comment-17751201 ] Krishen Bhan commented on HUDI-6596: Sorry I realized that I might now have fully addressed your concern about idempotency/concurrent retries. Currently with my proposal if concurrent jobs try to execute a rollback instant around the same time, even though only one job will execute the rollback at a time, the other jobs will all instantly fail, which may not always be desirable (as that exception might propagate upwards and cause the entire job to fail, creating more noise). In order to minimize this, I think I am going to revise my approach to - Add a new custom write config value "time to wait for rollback" - remove step 4 - Add the following step in between 5 and 6: {code:java} Acquire the table lock, check if the rollback heartbeat has expired, and if so start the rollback heartbeat again and release lock. Let's call this "claim the heartbeat". If the rollback heartbeat is still active, then - If "time to wait for rollback" isn't enabled, the raise an exception and release lock - If "time to wait for rollback" is enabled, then release the table lock and keep on trying (every x seconds) to "claim the heartbeat". If we are not able to "claim the heartbeat" within "time to wait for rollback" seconds then we raise an exception, like the previous case. Otherwise, if we successfully "claim the heartbeat", then this means the recent rollback attempt either finished or failed. In order to determine which, we reload the active timeline again. If the pending rollback instant is still in timeline, we execute it, otherwise we safely return False. Note that although we are repeatedly holding and releasing the table lock every time we try to "claim the heartbeat", we are only making 1-2 DFS calsl (to check the heartbeat file and create it if possible) while the lock is held.{code} Although this does unfortunately add some code complexity, I plan to add this to my organization's internal HUDI fork since I think it will be helpful for us to reduce frequency of failures, and I figured I should share this here (and on the PR I will create) in case this might be useful for other Apache HUDI users. > Propose rollback implementation changes to guard against concurrent jobs > - > > Key: HUDI-6596 > URL: https://issues.apache.org/jira/browse/HUDI-6596 > Project: Apache Hudi > Issue Type: Wish >Reporter: Krishen Bhan >Priority: Trivial > > h1. Issue > The existing rollback API in 0.14 > [https://github.com/apache/hudi/blob/master/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java#L877] > executes a rollback plan, either taking in an existing rollback plan > provided by the caller for a previous rollback or attempt, or scheduling a > new rollback instant if none is provided. Currently it is not safe for two > concurrent jobs to call this API (when skipLocking=False and the callers > aren't already holding a lock), as this can lead to an issue where multiple > rollback requested plans are created or two jobs are executing the same > rollback instant at the same time. > h1. Proposed change > One way to resolve this issue is to refactor this rollback function such that > if skipLocking=false, the following steps are followed > # Acquire the table lock > # Reload the active timeline > # Look at the active timeline to see if there is a inflight rollback instant > from a previous rollback attempt, if it exists then assign this is as the > rollback plan to execute. Also, check if a pending rollback plan was passed > in by caller. Then it executes the following steps depending on whether the > caller passed a pending rollback instant plan. > ## [a] If a pending inflight rollback plan was passed in by caller, then > check that there is a previous attempted rollback instant on timeline (and > that the instant times match) and continue to use this rollback plan. If that > isn't the case, then raise a rollback exception since this means another job > has concurrently already executed this plan. Note that in a valid HUDI > dataset there can be at most one rollback instant for a corresponding commit > instant, which is why if we no longer see a pending rollback in timeline in > this phase we can safely assume that it had already been executed to > completion. > ## [b] If no pending inflight rollback plan was passed in by caller and no > pending rollback instant was found in timeline earlier, then schedule a new > rollback plan > # Now that a rollback plan and requested rollback instant time has been > assigned, check for an active heartbeat for the rollback instant time.
[jira] [Comment Edited] (HUDI-6596) Propose rollback implementation changes to guard against concurrent jobs
[ https://issues.apache.org/jira/browse/HUDI-6596?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750879#comment-17750879 ] Krishen Bhan edited comment on HUDI-6596 at 8/3/23 8:15 PM: Thanks for the reply! > The table lock could become a bottleneck, potentially leading to performance > issues as other operations might be blocked too. It might be useful to > consider how frequently you expect concurrent rollbacks to occur and whether > this might create a performance problem. Based on our use case at least, I would expect attempted concurrent rollbacks to be uncommon / an edge case for now, and more rare as my organization internally makes more fixes/changes to our orchestration process. But you bring up a good concern that because this locks around the scheduledRollback call , it will block other hudi writers from progressing while it generates the rollback plan, which might be relatively time-consuming (since in spark HUDI 0.10 can lead to launching a new spark stage) at least compared to other places where HUDI holds the table lock. If this becomes a concern for other HUDI users (and we want HUDI OCC locking to avoid holding table lock while reading/creating an arbitrary/unbounded number of instants/data files in parallel), one solution I can think of would be to refactor HoodieTable scheduleRollback call such that we can first create the rollback plan before acquiring the table lock (in my proposed approach), and then later if we actually call HoodieTable scheduleRollback we just pass in this existing rollback plan. This way the actual "work" needed to generate rollback plan is done beforehand before locking, and we only check and update instant file(s) while under lock (and since in my proposed approach we anyway only schedule rollback if none has ever been scheduled before, I think that we shouldn't have to worry about the rollback plan we created becoming invalid/stale). Though I'm not sure how feasible this is since it since it may affect public APIs. > Can we ensure rollbacks are idempotent in case of repeated failures or > retries? Yes making sure rollbacks are idempotent ( in the sense that a pending rollback can keep on being retried until success) is a must. Both the current/original implementation and the proposed approach should address this. Both approaches/implementations handle the case where rollback is pending but the instant to rollback is gone (where we need to re-materialize the instant info and tell the rollback execution to not actually delete instants). Also, in the proposed approach here in step 3 we are directly using the pending rollback instant that we observe in the timeline, if one exists. Unfortunately the logic and my phrasing for step 3 is a bit awkward, since because the caller can pass in a pendingRollbackInfo that it expects to be executed, I decided that we couldn't just ignore it, but rather we needed to make validate that this pendingRollbackInfo is the same as the pending rollback instant we just saw in the timeline, and abort the rollback if not. > Worth considering edge cases where heartbeats could become stale or be missed > (e.g., if a job crashes without properly closing its heartbeat). Handling > these scenarios gracefully will help ensure that rollbacks can still proceed > when needed. Yeah, if a failed rollback job doesn't clean up the heartbeat after failure, any rollback attempt right after (within the interval of `[heartbeat timeout * (allowed heartbeat misses + 1)]` I think) will fail. The alternative (that I can think of) would be to simply allow for the chance of 2+ rollback jobs to work on the same rollback instant. The issue though is that even if the HoodieTable executeRollback implementation prevents the dataset from being corrupted and will just produce a retry-able failure, I thought that it would be noisy/tricky for a user to understand/debug. So I decided to add Step 4, since from my perspective/understanding it was a tradeoff between "always failing with an easy to understand retry-able exception" versus "rarely failing with a hard to understand retry-able exception". was (Author: JIRAUSER301521): Thanks for the reply! > The table lock could become a bottleneck, potentially leading to performance > issues as other operations might be blocked too. It might be useful to > consider how frequently you expect concurrent rollbacks to occur and whether > this might create a performance problem. Based on our use case at least, I would expect attempted concurrent rollbacks to be uncommon / an edge case for now, and more rare as my organization internally makes more fixes/changes to our orchestration process. But you bring up a good concern that because this locks around the scheduledRollback call , it will block other hudi writers from progressing while it generates the rollback plan,
[jira] [Commented] (HUDI-6596) Propose rollback implementation changes to guard against concurrent jobs
[ https://issues.apache.org/jira/browse/HUDI-6596?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750879#comment-17750879 ] Krishen Bhan commented on HUDI-6596: Thanks for the reply! > The table lock could become a bottleneck, potentially leading to performance > issues as other operations might be blocked too. It might be useful to > consider how frequently you expect concurrent rollbacks to occur and whether > this might create a performance problem. Based on our use case at least, I would expect attempted concurrent rollbacks to be uncommon / an edge case for now, and more rare as my organization internally makes more fixes/changes to our orchestration process. But you bring up a good concern that because this locks around the scheduledRollback call , it will block other hudi writers from progressing while it generates the rollback plan, which might be relatively time-consuming (since in spark HUDI 0.10 can lead to launching a new spark stage) at least compared to other places where HUDI holds the table lock. If this becomes a concern for other HUDI users (and we want HUDI OCC locking to avoid holding table lock while reading/creating an arbitrary/unbounded number of instants/data files in parallel), one solution I can think of would be to refactor HoodieTable scheduleRollback call such that we can first create the rollback plan before acquiring the table lock (in my proposed approach), and then later if we actually call HoodieTable scheduleRollback we just pass in this existing rollback plan. This way the actual "work" needed to generate rollback plan is done beforehand before locking, and we only check and update instant file(s) while under lock (and since in my proposed approach we anyway only schedule rollback if none has ever been scheduled before, I think that we shouldn't have to worry about the rollback plan we created becoming invalid/stale). Though I'm not sure how feasible this is since it since it may affect public APIs. > Can we ensure rollbacks are idempotent in case of repeated failures or > retries? Yes making sure rollbacks are idempotent ( in the sense that a pending rollback can keep on being retried until success) is a must. Both the current/original implementation and the proposed approach should address this. Both approaches/implementations handle the case where rollback is pending but the instant to rollback is gone (where we need to re-materialize the instant info and tell the rollback execution to not actually delete instants). Also, in the proposed approach here in step 3 we are directly using the pending rollback instant that we observe in the timeline, if one exists. Unfortunately the logic and my phrasing for step 3 is a bit awkward, since because the caller can pass in a pendingRollbackInfo that it expects to be executed, I decided that we couldn't just ignore it, but rather we needed to make validate that this pendingRollbackInfo is the same as the pending rollback instant we just saw in the timeline, and abort the rollback if not. > Worth considering edge cases where heartbeats could become stale or be missed > (e.g., if a job crashes without properly closing its heartbeat). Handling > these scenarios gracefully will help ensure that rollbacks can still proceed > when needed. Yeah, if a failed rollback job doesn't clean up the heartbeat after failure, any rollback attempt right after (within the interval of `[heartbeat timeout * (allowed heartbeat misses + 1)]` I think) will fail. The alternative (that I can think of) would be to simply allow for the chance of 2+ rollback jobs to work on the same rollback instant. The issue though is that even if the HoodieTable executeRollback implementation prevents the dataset from being corrupted and will just produce a retry-able failure, I thought that it would be noisy/tricky for a user to understand/debug. So I decided to add Step 4, since from my perspective/understanding it was a tradeoff between "always failing with an easy to understand retry-able exception" versus "rarely failing with a hard to understand retry-able exception". Though I think the chance of "stale heartbeat" can be lowered by updating the commit code path to clean the heartbeat if an exception is raised (this isn't done in 0.10 I think, but not sure if its still like this in 0.14). In fact my organization internally does this with our internal forked/modified version of clustering replacecommit (we have modified replacecommit to not have an immutable plan, similar to commit). Though of course this isn't a guarantee see a HUDI spark job writer might fail with a lower level runtime error. > Propose rollback implementation changes to guard against concurrent jobs > - > > Key: HUDI-6596 > URL:
[jira] [Updated] (HUDI-6596) Propose rollback implementation changes to guard against concurrent jobs
[ https://issues.apache.org/jira/browse/HUDI-6596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Krishen Bhan updated HUDI-6596: --- Description: h1. Issue The existing rollback API in 0.14 [https://github.com/apache/hudi/blob/master/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java#L877] executes a rollback plan, either taking in an existing rollback plan provided by the caller for a previous rollback or attempt, or scheduling a new rollback instant if none is provided. Currently it is not safe for two concurrent jobs to call this API (when skipLocking=False and the callers aren't already holding a lock), as this can lead to an issue where multiple rollback requested plans are created or two jobs are executing the same rollback instant at the same time. h1. Proposed change One way to resolve this issue is to refactor this rollback function such that if skipLocking=false, the following steps are followed # Acquire the table lock # Reload the active timeline # Look at the active timeline to see if there is a inflight rollback instant from a previous rollback attempt, if it exists then assign this is as the rollback plan to execute. Also, check if a pending rollback plan was passed in by caller. Then it executes the following steps depending on whether the caller passed a pending rollback instant plan. ## [a] If a pending inflight rollback plan was passed in by caller, then check that there is a previous attempted rollback instant on timeline (and that the instant times match) and continue to use this rollback plan. If that isn't the case, then raise a rollback exception since this means another job has concurrently already executed this plan. Note that in a valid HUDI dataset there can be at most one rollback instant for a corresponding commit instant, which is why if we no longer see a pending rollback in timeline in this phase we can safely assume that it had already been executed to completion. ## [b] If no pending inflight rollback plan was passed in by caller and no pending rollback instant was found in timeline earlier, then schedule a new rollback plan # Now that a rollback plan and requested rollback instant time has been assigned, check for an active heartbeat for the rollback instant time. If there is one, then abort the rollback as that means there is a concurrent job executing that rollback. If not, then start a heartbeat for that rollback instant time. # Release the table lock # Execute the rollback plan and complete the rollback instant. Regardless of whether this succeeds or fails with an exception, close the heartbeat. This increases the chance that the next job that tries to call this rollback API will follow through with the rollback and not abort due to an active previous heartbeat * These steps will only be enforced for skipLocking=false, since if skipLocking=true then that means the caller may already be explicitly holding a table lock. In this case, acquiring the lock again in step (1) will fail. * Acquiring a lock and reloading timeline for (1-3) will guard against data race conditions where another job calls this rollback API at same time and schedules its own rollback plan and instant. This is since if no rollback has been attempted before for this instant, then before step (1), there is a window of time where another concurrent rollback job could have scheduled a rollback plan, failed execution, and cleaned up heartbeat, all while the current rollback job is running. As a result, even if the current job was passed in an empty pending rollback plan, it still needs to check the active timeline to ensure that no new rollback pending instant has been created. * Using a heartbeat will signal to other callers in other jobs that there is another job already executing this rollback. Checking for expired heartbeat and (re)-starting the heartbeat has to be done under a lock, so that multiple jobs don't each start it at the same time and assume that they are the only ones that are heartbeating. * The table lock is no longer needed after (5), since it can now be safely assumed that no other job (calling this rollback API) will execute this rollback instant. One example implementation to achieve this: {code:java} @Deprecated public boolean rollback(final String commitInstantTime, Option pendingRollbackInfo, boolean skipLocking, Option rollbackInstantTimeOpt) throws HoodieRollbackException { final Timer.Context timerContext = this.metrics.getRollbackCtx(); final Option commitInstantOpt; final HoodieTable table; try { table = createTable(config, hadoopConf); } catch (Exception e) { throw new HoodieRollbackException("Failed to initalize table for rollback " + config.getBasePath() + " commits " + commitInstantTime, e); } final String rollbackInstantTime; final boolean
[jira] [Updated] (HUDI-6596) Propose rollback implementation changes to guard against concurrent jobs
[ https://issues.apache.org/jira/browse/HUDI-6596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Krishen Bhan updated HUDI-6596: --- Description: h1. Issue The existing rollback API in 0.14 [https://github.com/apache/hudi/blob/master/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java#L877] executes a rollback plan, either taking in an existing rollback plan provided by the caller for a previous rollback or attempt, or scheduling a new rollback instant if none is provided. Currently it is not safe for two concurrent jobs to call this API (when skipLocking=False and the callers aren't already holding a lock), as this can lead to an issue where multiple rollback requested plans are created or two jobs are executing the same rollback instant at the same time. h1. Proposed change One way to resolve this issue is to refactor this rollback function such that if skipLocking=false, the following steps are followed # Acquire the table lock # Reload the active timeline # Look at the active timeline to see if there is a inflight rollback instant from a previous rollback attempt, if it exists then assign this is as the rollback plan to execute (at first). Also, check if a pending rollback plan was passed in by caller. Then it executes the following steps depending on whether the caller passed a pending rollback instant plan. ## [a] If a pending inflight rollback plan was passed in by caller, then check that there is a previous attempted rollback instant on timeline (and that the instant times match) and continue to use this rollback plan. If that isn't the case, then raise a rollback exception since this means another job has concurrently already executed this plan. Note that in a valid HUDI dataset there can be at most one rollback instant for a corresponding commit instant, which is why if we no longer see a pending rollback in timeline in this phase we can safely assume that it had already been executed to completion. ## [b] If no pending inflight rollback plan was passed in by caller and no pending rollback instant was found in timeline earlier, then schedule a new rollback plan # Now that a rollback plan and requested rollback instant time has been assigned, check for an active heartbeat for the rollback instant time. If there is one, then abort the rollback as that means there is a concurrent job executing that rollback. If not, then start a heartbeat for that rollback instant time. # Release the table lock # Execute the rollback plan and complete the rollback instant. Regardless of whether this succeeds or fails with an exception, close the heartbeat. This increases the chance that the next job that tries to call this rollback API will follow through with the rollback and not abort due to an active previous heartbeat * These steps will only be enforced for skipLocking=false, since if skipLocking=true then that means the caller may already be explicitly holding a table lock. In this case, acquiring the lock again in step (1) will fail. * Acquiring a lock and reloading timeline for (1-3) will guard against data race conditions where another job calls this rollback API at same time and schedules its own rollback plan and instant. This is since if no rollback has been attempted before for this instant, then before step (1), there is a window of time where another concurrent rollback job could have scheduled a rollback plan, failed execution, and cleaned up heartbeat, all while the current rollback job is running. As a result, even if the current job was passed in an empty pending rollback plan, it still needs to check the active timeline to ensure that no new rollback pending instant has been created. * Using a heartbeat will signal to other callers in other jobs that there is another job already executing this rollback. Checking for expired heartbeat and (re)-starting the heartbeat has to be done under a lock, so that multiple jobs don't each start it at the same time and assume that they are the only ones that are heartbeating. * The table lock is no longer needed after (5), since it can now be safely assumed that no other job (calling this rollback API) will execute this rollback instant. One example implementation to achieve this: {code:java} @Deprecated public boolean rollback(final String commitInstantTime, Option pendingRollbackInfo, boolean skipLocking, Option rollbackInstantTimeOpt) throws HoodieRollbackException { final Timer.Context timerContext = this.metrics.getRollbackCtx(); final Option commitInstantOpt; final HoodieTable table; try { table = createTable(config, hadoopConf); } catch (Exception e) { throw new HoodieRollbackException("Failed to initalize table for rollback " + config.getBasePath() + " commits " + commitInstantTime, e); } final String rollbackInstantTime; final boolean
[jira] [Updated] (HUDI-6596) Propose rollback implementation changes to guard against concurrent jobs
[ https://issues.apache.org/jira/browse/HUDI-6596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Krishen Bhan updated HUDI-6596: --- Description: h1. Issue The existing rollback API in 0.14 [https://github.com/apache/hudi/blob/master/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java#L877] executes a rollback plan, either taking in an existing rollback plan provided by the caller for a previous rollback or attempt, or scheduling a new rollback instant if none is provided. Currently it is not safe for two concurrent jobs to call this API (when skipLocking=False and the callers aren't already holding a lock), as this can lead to an issue where multiple rollback requested plans are created or two jobs are executing the same rollback instant at the same time. h1. Proposed change One way to resolve this issue is to refactor this rollback function such that if skipLocking=false, the following steps are followed # Acquire the table lock # Reload the active timeline # Look at the active timeline to see if there is a inflight rollback instant from a previous rollback attempt, if it exists then assign this is as the rollback plan to execute. Also, check if a pending rollback plan was passed in by caller. Then it executes the following steps depending on whether the caller passed a pending rollback instant plan. ## [a] If a pending inflight rollback plan was passed in by caller, then check that there is a previous attempted rollback instant on timeline (and that the instant times match) and continue to use this rollback plan. If that isn't the case, then raise a rollback exception since this means another job has concurrently already executed this plan. Note that in a valid HUDI dataset there can be at most one rollback instant for a corresponding commit instant, which is why if we no longer see a pending rollback in timeline in this phase we can safely assume that it had already been executed to completion. ## [b] If no pending inflight rollback plan was passed in by caller and no pending rollback instant was found in timeline earlier, then schedule a new rollback plan # Now that a rollback plan and requested rollback instant time has been assigned, check for an active heartbeat for the rollback instant time. If there is one, then abort the rollback as that means there is a concurrent job executing that rollback. If not, then start a heartbeat for that rollback instant time. # Release the table lock # Execute the rollback plan and complete the rollback instant. Regardless of whether this succeeds or fails with an exception, close the heartbeat. This increases the chance that the next job that tries to call this rollback API will follow through with the rollback and not abort due to an active previous heartbeat * These steps will only be enforced for skipLocking=false, since if skipLocking=true then that means the caller may already be explicitly holding a table lock. In this case, acquiring the lock again in step (1) will fail. * Acquiring a lock and reloading timeline for (1-3) will guard against data race conditions where another job calls this rollback API at same time and schedules its own rollback plan and instant. This is since if no rollback has been attempted before for this instant, then before step (1), there is a window of time where another concurrent rollback job could have scheduled a rollback plan, failed execution, and cleaned up heartbeat, all while the current rollback job is running. As a result, even if the current job was passed in an empty pending rollback plan, it still needs to check the active timeline to ensure that no new rollback pending instant has been created. * Using a heartbeat will signal to other callers in other jobs that there is another job already executing this rollback. Checking for expired heartbeat and (re)-starting the heartbeat has to be done under a lock, so that multiple jobs don't each start it at the same time and assume that they are the only ones that are heartbeating. * The table lock is no longer needed after (5), since it can now be safely assumed that no other job (calling this rollback API) will execute this rollback instant. One example implementation to achieve this: {code:java} @Deprecated public boolean rollback(final String commitInstantTime, Option pendingRollbackInfo, boolean skipLocking, Option rollbackInstantTimeOpt) throws HoodieRollbackException { final Timer.Context timerContext = this.metrics.getRollbackCtx(); final Option commitInstantOpt; final HoodieTable table; try { table = createTable(config, hadoopConf); } catch (Exception e) { throw new HoodieRollbackException("Failed to initalize table for rollback " + config.getBasePath() + " commits " + commitInstantTime, e); } final String rollbackInstantTime; final boolean
[jira] [Updated] (HUDI-6596) Propose rollback implementation changes to guard against concurrent jobs
[ https://issues.apache.org/jira/browse/HUDI-6596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Krishen Bhan updated HUDI-6596: --- Description: h1. Issue The existing rollback API in 0.14 [https://github.com/apache/hudi/blob/master/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java#L877] executes a rollback plan, either taking in an existing rollback plan provided by the caller for a previous rollback or attempt, or scheduling a new rollback instant if none is provided. Currently it is not safe for two concurrent jobs to call this API (when skipLocking=False and the callers aren't already holding a lock), as this can lead to an issue where multiple rollback requested plans are created or two jobs are executing the same rollback instant at the same time. h1. Proposed change One way to resolve this issue is to refactor this rollback function such that if skipLocking=false, the following steps are followed # Acquire the table lock # Reload the active timeline # Look at the active timeline to see if there is a inflight rollback instant from a previous rollback attempt, if it exists then assign this is as the rollback plan to execute. Also, check if a pending rollback plan was passed in by caller. Then it executes the following steps depending on whether the caller passed a pending rollback instant plan ## [a] If a pending inflight rollback plan was passed in by caller, then check that there is a previous attempted rollback instant on timeline (and that the instant times match) and continue to use this rollback plan. If that isn't the case, then raise a rollback exception since this means another job has concurrently already executed this plan. Note that in a valid HUDI dataset there can be at most one rollback instant for a corresponding commit instant, which is why if we no longer see a pending rollback in timeline in this phase we can safely assume that it had already been executed to completion. ## [b] If no pending inflight rollback plan was passed in by caller then schedule a new rollback plan if no pending rollback instant was found in timeline earlier. # Now that a rollback plan and requested rollback instant time has been assigned, check for an active heartbeat for the rollback instant time. If there is one, then abort the rollback as that means there is a concurrent job executing that rollback. If not, then start a heartbeat for that rollback instant time. # Release the table lock # Execute the rollback plan and complete the rollback instant. Regardless of whether this succeeds or fails with an exception, close the heartbeat. This increases the chance that the next job that tries to call this rollback API will follow through with the rollback and not abort due to an active previous heartbeat * These steps will only be enforced for skipLocking=false, since if skipLocking=true then that means the caller may already be explicitly holding a table lock. In this case, acquiring the lock again in step (1) will fail. * Acquiring a lock and reloading timeline for (1-3) will guard against data race conditions where another job calls this rollback API at same time and schedules its own rollback plan and instant. This is since if no rollback has been attempted before for this instant, then before step (1), there is a window of time where another concurrent rollback job could have scheduled a rollback plan, failed execution, and cleaned up heartbeat, all while the current rollback job is running. As a result, even if the current job was passed in an empty pending rollback plan, it still needs to check the active timeline to ensure that no new rollback pending instant has been created. * Using a heartbeat will signal to other callers in other jobs that there is another job already executing this rollback. Checking for expired heartbeat and (re)-starting the heartbeat has to be done under a lock, so that multiple jobs don't each start it at the same time and assume that they are the only ones that are heartbeating. * The table lock is no longer needed after (5), since it can now be safely assumed that no other job (calling this rollback API) will execute this rollback instant. One example implementation to achieve this: {code:java} @Deprecated public boolean rollback(final String commitInstantTime, Option pendingRollbackInfo, boolean skipLocking, Option rollbackInstantTimeOpt) throws HoodieRollbackException { final Timer.Context timerContext = this.metrics.getRollbackCtx(); final Option commitInstantOpt; final HoodieTable table; try { table = createTable(config, hadoopConf); } catch (Exception e) { throw new HoodieRollbackException("Failed to initalize table for rollback " + config.getBasePath() + " commits " + commitInstantTime, e); } final String rollbackInstantTime; final boolean
[jira] [Updated] (HUDI-6596) Propose rollback implementation changes to guard against concurrent jobs
[ https://issues.apache.org/jira/browse/HUDI-6596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Krishen Bhan updated HUDI-6596: --- Description: h1. Issue The existing rollback API in 0.14 [https://github.com/apache/hudi/blob/master/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java#L877] executes a rollback plan, either taking in an existing rollback plan provided by the caller for a previous rollback or attempt, or scheduling a new rollback instant if none is provided. Currently it is not safe for two concurrent jobs to call this API (when skipLocking=False and the callers aren't already holding a lock), as this can lead to an issue where multiple rollback requested plans are created or two jobs are executing the same rollback instant at the same time. h1. Proposed change One way to resolve this issue is to refactor this rollback function such that if skipLocking=false, the following steps are followed # Acquire the table lock # Reload the active timeline # Look at the active timeline to see if there is a inflight rollback instant from a previous rollback attempt, if it exists then assign this is as the rollback plan to execute. Also, check if a pending rollback plan was passed in by caller. Then it executes the following steps depending on whether the caller passed a pending rollback instant plan ## [a] If a pending inflight rollback plan was passed in by caller, then check that there is a previous attempted rollback instant on timeline (and that the instant times match) and continue to use this rollback plan. If that isn't the case, then raise a rollback exception since this means another job has concurrently already executed this plan. Note that in a valid HUDI dataset there can be at most one rollback instant for a corresponding commit instant, which is why if we no longer see a pending rollback in timeline in this phase we can safely assume that it had already been executed to completion. ## [b] If no pending inflight rollback plan was passed in by caller then schedule a new rollback plan if no pending rollback instant was found in timeline earlier. # Now that a rollback plan and requested rollback instant time has been assigned, check for an active heartbeat for the rollback instant time. If there is one, then abort the rollback as that means there is a concurrent job executing that rollback. If not, then start a heartbeat for that rollback instant time. # Release the table lock # Execute the rollback plan and complete the rollback instant. Whether this succeeds or fails with an exception, close the heartbeat. This increases the chance that the next job that tries to call this rollback API will follow through with the rollback and not abort due to an active previous heartbeat * These steps will only be enforced for skipLocking=false, since if skipLocking=true then that means the caller may already be explicitly holding a table lock. In this case, acquiring the lock again in step (1) will fail. * Acquiring a lock and reloading timeline for (1-3) will guard against data race conditions where another job calls this rollback API at same time and schedules its own rollback plan and instant. This is since if no rollback has been attempted before for this instant, then before step (1), there is a window of time where another concurrent rollback job could have scheduled a rollback plan, failed execution, and cleaned up heartbeat, all while the current rollback job is running. As a result, even if the current job was passed in an empty pending rollback plan, it still needs to check the active timeline to ensure that no new rollback pending instant has been created. * Using a heartbeat will signal to other callers in other jobs that there is another job already executing this rollback. Checking for expired heartbeat and (re)-starting the heartbeat has to be done under a lock, so that multiple jobs don't each start it at the same time and assume that they are the only ones that are heartbeating. * The table lock is no longer needed after (5), since it can now be safely assumed that no other job (calling this rollback API) will execute this rollback instant. One example implementation to achieve this: {code:java} @Deprecated public boolean rollback(final String commitInstantTime, Option pendingRollbackInfo, boolean skipLocking, Option rollbackInstantTimeOpt) throws HoodieRollbackException { final Timer.Context timerContext = this.metrics.getRollbackCtx(); final Option commitInstantOpt; final HoodieTable table; try { table = createTable(config, hadoopConf); } catch (Exception e) { throw new HoodieRollbackException("Failed to initalize table for rollback " + config.getBasePath() + " commits " + commitInstantTime, e); } final String rollbackInstantTime; final boolean deleteInstantsDuringRollback;
[jira] [Created] (HUDI-6596) Propose rollback implementation changes to guard against concurrent jobs
Krishen Bhan created HUDI-6596: -- Summary: Propose rollback implementation changes to guard against concurrent jobs Key: HUDI-6596 URL: https://issues.apache.org/jira/browse/HUDI-6596 Project: Apache Hudi Issue Type: Wish Reporter: Krishen Bhan h1. Issue The existing rollback API in 0.14 [https://github.com/apache/hudi/blob/master/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java#L877] executes a rollback plan, either taking in an existing rollback plan provided by the caller for a previous rollback or attempt, or scheduling a new rollback instant if none is provided. Currently it is not safe for two concurrent jobs to call this API (when skipLocking=False and the callers aren't already holding a lock), as this can lead to an issue where multiple rollback requested plans are created or two jobs are executing the same rollback instant at the same time. h1. Proposed change One way to resolve this issue is to refactor this rollback function such that if skipLocking=false, the following steps are followed # Acquire the table lock # Reload the active timeline # Look at the active timeline to see if there is a inflight rollback instant from a previous rollback attempt, if it exists then assign this is as the rollback plan to execute. Also, check if a pending rollback plan was passed in by caller. Then makes the following checks ## [a] If a pending inflight rollback plan was passed in by caller, then check that there is a previous attempted rollback instant on timeline (and that the instant times match) and use this rollback plan. If that isn't the case, then raise a rollback exception since this means another job has concurrently already executed this plan. ## [b] If no pending inflight rollback plan was passed in by caller, then schedule a new rollback plan if an existing one wasn't found in active timeline. # Now that a rollback plan and requested rollback instant time has been assigned, check for an active heartbeat for the rollback instant time. If there is one, then abort the rollback as that means there is a concurrent job executing that rollback. If not, then start a heartbeat for that rollback instant time. # Release the table lock # Execute the rollback plan and complete the rollback instant. Whether this succeeds or fails with an exception, close the heartbeat. This increases the chance that the next job that tries to call this rollback API will follow through with the rollback and not abort due to an active previous heartbeat * These steps will only be enforced for skipLocking=false, since if skipLocking=true then that means the caller may already be explicitly holding a table lock. In this case, acquiring the lock again in step (1) will fail. * Acquiring a lock and reloading timeline for (1-3) will guard against data race conditions where another job calls this rollback API at same time and schedules its own rollback plan and instant. This is since if no rollback has been attempted before for this instant, then before step (1), there is a window of time where another concurrent rollback job could have scheduled a rollback plan, failed execution, and cleaned up heartbeat, all while the current rollback job is running. As a result, even if the current job was passed in an empty pending rollback plan, it still needs to check the active timeline to ensure that no new rollback pending instant has been created. * Using a heartbeat will signal to other callers in other jobs that there is another job already executing this rollback. Checking for expired heartbeat and (re)-starting the heartbeat has to be done under a lock, so that multiple jobs don't each start it at the same time and assume that they are the only ones that are heartbeating. * The table lock is no longer needed after (5), since it can now be safely assumed that no other job (calling this rollback API) will execute this rollback instant. One example implementation to achieve this: {code:java} @Deprecated public boolean rollback(final String commitInstantTime, Option pendingRollbackInfo, boolean skipLocking, Option rollbackInstantTimeOpt) throws HoodieRollbackException { final Timer.Context timerContext = this.metrics.getRollbackCtx(); final Option commitInstantOpt; final HoodieTable table; try { table = createTable(config, hadoopConf); } catch (Exception e) { throw new HoodieRollbackException("Failed to initalize table for rollback " + config.getBasePath() + " commits " + commitInstantTime, e); } final String rollbackInstantTime; final boolean deleteInstantsDuringRollback; final HoodieInstant instantToRollback; try { if (!skipLocking) { // Do step 1 and 2 txnManager.beginTransaction(); table.getMetaClient().reloadActiveTimeline(); } final