[jira] [Created] (HUDI-7946) [Umbrella] RFC-79 : Improving reliability of concurrent table service executions and rollbacks

2024-07-02 Thread Krishen Bhan (Jira)
Krishen Bhan created HUDI-7946:
--

 Summary: [Umbrella] RFC-79 : Improving reliability of concurrent 
table service executions and rollbacks
 Key: HUDI-7946
 URL: https://issues.apache.org/jira/browse/HUDI-7946
 Project: Apache Hudi
  Issue Type: Epic
  Components: multi-writer, table-service
Reporter: Krishen Bhan


This is the umbrella ticket that tracks the overall implementation of RFC-79
h4.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (HUDI-7507) ongoing concurrent writers with smaller timestamp can cause issues with table services

2024-05-15 Thread Krishen Bhan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-7507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Krishen Bhan updated HUDI-7507:
---
Description: 
*Scenarios:*

Although HUDI operations hold a table lock when creating a .requested instant, 
because HUDI writers do not generate a timestamp and create a .requsted plan in 
the same transaction, there can be a scenario where 
 # Job 1 starts, chooses timestamp (x) , Job 2 starts and chooses timestamp (x 
- 1)
 # Job 1 schedules and creates requested file with instant timestamp (x)
 # Job 2 schedules and creates requested file with instant timestamp (x-1)
 # Both jobs continue running

If one job is writing a commit and the other is a table service, this can cause 
issues:
 * 
 ** If Job 2 is ingestion commit and Job 1 is compaction/log compaction, then 
when Job 1 runs before Job 2 and can create a compaction plan for all instant 
times (up to (x) ) that doesn’t include instant time (x-1) .  Later Job 2 will 
create instant time (x-1), but timeline will be in a corrupted state since 
compaction plan was supposed to include (x-1)
 ** There is a similar issue with clean. If Job2 is a long-running commit (that 
was stuck/delayed for a while before creating its .requested plan) and Job 1 is 
a clean, then Job 1 can perform a clean that updates the 
earliest-commit-to-retain without waiting for the inflight instant by Job 2 at 
(x-1) to complete. This causes Job2 to be "skipped" by clean.
 ** If the completed commit files include som sort of "checkpointing" with 
another "downstream job" performing incremental reads on this dataset (such as 
Hoodie Streamer/DeltaSync) then there may be incorrect behavior, such as the 
incremental reader skipping some completed commits (that have a smaller instant 
timestamp than latest completed commit but were created after).

[Edit] I added a diagram to visualize the issue, specifically the second 
scenario with clean

!Flowchart (1).png!

*Proposed approach:*

One way this can be resolved is by combining the operations of generating 
instant time and creating a requested file in the same HUDI table transaction. 
Specifically, executing the following steps whenever any instant (commit, table 
service, etc) is scheduled

Approach A
 # Acquire table lock
 # Look at the latest instant C on the active timeline (completed or not). 
Generate a timestamp after C
 # Create the plan and requested file using this new timestamp ( that is 
greater than C)
 # Release table lock

Unfortunately (A) has the following drawbacks
 * Every operation must now hold the table lock when computing its plan even if 
it's an expensive operation and will take a while
 * Users of HUDI cannot easily set their own instant time of an operation, and 
this restriction would break any public APIs that allow this and would require 
deprecating those APIs.

 

An alternate approach is to have every operation abort creating a .requested 
file unless it has the latest timestamp. Specifically, for any instant type, 
whenever an operation is about to create a .requested plan on timeline, it 
should take the table lock and assert that there are no other instants on 
timeline that are greater than it that could cause a conflict. If that 
assertion fails, then throw a retry-able conflict resolution exception.

Specifically, the following steps should be followed whenever any instant 
(commit, table service, etc) is scheduled

Approach B
 # Acquire table lock. Assume that the desired instant time C and requested 
file plan metadata have already been created, regardless of wether it was 
before this step or right after acquiring the table lock.
 # If there are any instants on the timeline that are greater than C 
(regardless of their operation type or sate status) then release table lock and 
throw an exception
 # Create requested plan on timeline (As usual)
 # Release table lock

Unlike (A), this approach (B) allows users to continue to use HUDI APIs where 
caller can specify instant time (preventing the need from deprecating any 
public API). It also allows the possibility of table service operations 
computing their plan without holding a lock. Despite this though, (B) has 
following drawbacks
 * It is not immediately clear how MDT vs base table operations should be 
handled here. Do we need to update (2) to consider both base table and MDT 
timelines (rather than just MDT)?
 * This error will still be thrown even for scenarios of concurrent operations 
where it would be safe to continue. For example, assume two ingestion writers 
being executing on a dataset, with each only performing a insert commit on the 
dataset (with no compact/clean being scheduled on MDT). Additionally, assume 
there is no "downstream" job performing incremental reads on this dataset. If 
the writer that started scheduling later ending up having an earlier timestamp, 
it would still be safe for it to continue. Despite that, because of 

[jira] [Updated] (HUDI-7507) ongoing concurrent writers with smaller timestamp can cause issues with table services

2024-05-06 Thread Krishen Bhan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-7507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Krishen Bhan updated HUDI-7507:
---
Description: 
*Scenarios:*

Although HUDI operations hold a table lock when creating a .requested instant, 
because HUDI writers do not generate a timestamp and create a .requsted plan in 
the same transaction, there can be a scenario where 
 # Job 1 starts, chooses timestamp (x) , Job 2 starts and chooses timestamp (x 
- 1)
 # Job 1 schedules and creates requested file with instant timestamp (x)
 # Job 2 schedules and creates requested file with instant timestamp (x-1)
 # Both jobs continue running

If one job is writing a commit and the other is a table service, this can cause 
issues:
 * 
 ** If Job 2 is ingestion commit and Job 1 is compaction/log compaction, then 
when Job 1 runs before Job 2 and can create a compaction plan for all instant 
times (up to (x) ) that doesn’t include instant time (x-1) .  Later Job 2 will 
create instant time (x-1), but timeline will be in a corrupted state since 
compaction plan was supposed to include (x-1)
 ** There is a similar issue with clean. If Job2 is a long-running commit (that 
was stuck/delayed for a while before creating its .requested plan) and Job 1 is 
a clean, then Job 1 can perform a clean that updates the 
earliest-commit-to-retain without waiting for the inflight instant by Job 2 at 
(x-1) to complete. This causes Job2 to be "skipped" by clean.

[Edit] I added a diagram to visualize the issue, specifically the second 
scenario with clean

!Flowchart (1).png!

*Proposed approach:*

One way this can be resolved is by combining the operations of generating 
instant time and creating a requested file in the same HUDI table transaction. 
Specifically, executing the following steps whenever any instant (commit, table 
service, etc) is scheduled

Approach A
 # Acquire table lock
 # Look at the latest instant C on the active timeline (completed or not). 
Generate a timestamp after C
 # Create the plan and requested file using this new timestamp ( that is 
greater than C)
 # Release table lock

Unfortunately (A) has the following drawbacks
 * Every operation must now hold the table lock when computing its plan even if 
it's an expensive operation and will take a while
 * Users of HUDI cannot easily set their own instant time of an operation, and 
this restriction would break any public APIs that allow this and would require 
deprecating those APIs.

 

An alternate approach is to have every operation abort creating a .requested 
file unless it has the latest timestamp. Specifically, for any instant type, 
whenever an operation is about to create a .requested plan on timeline, it 
should take the table lock and assert that there are no other instants on 
timeline that are greater than it that could cause a conflict. If that 
assertion fails, then throw a retry-able conflict resolution exception.

Specifically, the following steps should be followed whenever any instant 
(commit, table service, etc) is scheduled

Approach B
 # Acquire table lock. Assume that the desired instant time C and requested 
file plan metadata have already been created, regardless of wether it was 
before this step or right after acquiring the table lock.
 # Get the set of all instants on the timeline that are greater than C 
(regardless of their operation type or sate status). 
 ## If the current operation is an "ingestion" type 
(commit/deltacommit/insert_overwrite replacecommit) then assert the set is 
empty. This is because another "ingestion" operation with a later instant time 
might schedule and execute a compaction at said instant time in MDT, leading 
the table in the aforementioned situation where a compact on MDT is scheduled 
after an inflight ingestion commit.
 ## If the current operation is a "table service" (clean/compaction/cluster) 
then assert that the set doesn't contain any table service instant types 
(clean/compaction/cluster).
 # Create requested plan on timeline (As usual)
 # Release table

Unlike (A), this approach (B) allows users to continue to use HUDI APIs where 
caller can specify instant time (preventing the need from deprecating any 
public API). It also allows the possibility of table service operations 
computing their plan without holding a lock. Despite this though, (B) has 
following drawbacks
 * It is not immediately clear how MDT vs base table operations should be 
handled here. Do we need to update (2) to build it's set from both base table 
and MDT timelines (rather than just MDT)?
 * This error will still be thrown even for scenarios of concurrent operations 
where it would be safe to continue. For example, assume two ingestion writers 
being executing on a dataset, with each only performing a insert commit on the 
dataset (with no table service being scheduled on MDT). If the writer that 
started scheduling later ending up having an earlier timestamp, 

[jira] [Created] (HUDI-7687) Instant should not be archived until replaced file groups or older file versions are deleted

2024-04-29 Thread Krishen Bhan (Jira)
Krishen Bhan created HUDI-7687:
--

 Summary: Instant should not be archived until replaced file groups 
or older file versions are deleted
 Key: HUDI-7687
 URL: https://issues.apache.org/jira/browse/HUDI-7687
 Project: Apache Hudi
  Issue Type: Improvement
Reporter: Krishen Bhan


When archival runs it may consider an instant as a candidate for archival even 
if the file groups said instant replaced/updated still need to undergo a 
`clean`. For example, consider the following scenario with clean and archived 
scheduled/executed independently in different jobs
 # Insert at C1 creates file group f1 in partition
 # Replacecommit at RC2 creates file group f2 in partition, and replaces f1
 # Any reader of partition that calls HUDI API (with or without using MDT) will 
recognize that f1 should be ignored, as it has been replaced. This is since RC2 
instant file is in active timeline
 # Some more instants are added to timeline. RC2 is now eligible to be cleaned 
(as per the table writers' clean policy). Assume though that file groups 
replaces by RC2 haven't been deleted yet, such as due to clean repeatedly 
failing, async clean not being scheduled yet, or the clean failing to delete 
said file groups.
 # An archive job eventually is triggered, and archives C1 and RC2. Note that 
f1 is still in partition

Now the table has the same consistency issue as seen in 
https://issues.apache.org/jira/browse/HUDI-7655 , where replaced file groups 
are still in partition and readers may see inconsistent data. 

 

This situation can be avoided by ensuring that archival will "block" and no go 
past an older instant time if it sees that said instant didn't undergo a clean 
yet. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (HUDI-7655) Support configuration for clean to fail execution if there is at least one file is marked as a failed delete

2024-04-29 Thread Krishen Bhan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-7655?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Krishen Bhan updated HUDI-7655:
---
Description: 
When a HUDI clean plan is executed, any targeted file that was not confirmed as 
deleted (or non-existing) will be marked as a "failed delete". Although these 
failed deletes will be added to `.clean` metadata, if incremental clean is used 
then these files might not ever be picked up again as a future clean plan, 
unless a "full-scan" clean ends up being scheduled. In addition to leading to 
more files unnecessarily taking up storage space for longer, then can lead to 
the following dataset consistency issue for COW datasets:
 # Insert at C1 creates file group f1 in partition
 # Replacecommit at RC2 creates file group f2 in partition, and replaces f1
 # Any reader of partition that calls HUDI API (with or without using MDT) will 
recognize that f1 should be ignored, as it has been replaced. This is since RC2 
instant file is in active timeline
 # Some completed instants later an incremental clean is scheduled. It moves 
the "earliest commit to retain" to an time after instant time RC2, so it 
targets f1 for deletion. But during execution of the plan, it fails to delete 
f1.
 # An archive job eventually is triggered, and archives C1 and RC2. Note that 
f1 is still in partition

At this point, any job/query that reads the aforementioned partition directly 
from the DFS file system calls (without directly using MDT FILES partition) 
will consider both f1 and f2 as valid file groups, since RC2 is no longer in 
active timeline. This is a data consistency issue, and will only be resolved if 
a "full-scan" clean is triggered and deletes f1.

This specific scenario can be avoided if the user can configure HUDI clean to 
fail execution of a clean plan unless all files are confirmed as deleted (or 
not existing in DFS already), "blocking" the clean. The next clean attempt will 
re-execute this existing plan, since clean plans cannot be "rolled back". 

  was:
When a HUDI clean plan is executed, any targeted file that was not confirmed as 
deleted (or non-existing) will be marked as a "failed delete". Although these 
failed deletes will be added to `.clean` metadata, if incremental clean is used 
then these files might not ever be picked up again as a future clean plan, 
unless a "full-scan" clean ends up being scheduled. In addition to leading to 
more files unnecessarily taking up storage space for longer, then can lead to 
the following dataset consistency issue for COW datasets:
 # Insert at C1 creates file group f1 in partition
 # Replacecommit at RC2 creates file group f2 in partition, and replaces f1
 # Any reader of partition that calls HUDI API (with or without using MDT) will 
recognize that f1 should be ignored, as it has been replaced. This is since RC2 
instant file is in active timeline
 # Some completed instants later an incremental clean is scheduled. It moves 
the "earliest commit to retain" to an time after instant time RC2, so it 
targets f1 for deletion. But during execution of the plan, it fails to delete 
f1.
 # An archive job eventually is triggered, and archives C1. Note that f1 is 
still in partition

At this point, any job/query that reads the aforementioned partition directly 
from the DFS file system calls (without directly using MDT FILES partition) 
will consider both f1 and f2 as valid file groups, since RC2 is no longer in 
active timeline. This is a data consistency issue, and will only be resolved if 
a "full-scan" clean is triggered and deletes f1.

This specific scenario can be avoided if the user can configure HUDI clean to 
fail execution of a clean plan unless all files are confirmed as deleted (or 
not existing in DFS already), "blocking" the clean. The next clean attempt will 
re-execute this existing plan, since clean plans cannot be "rolled back". 


> Support configuration for clean to fail execution if there is at least one 
> file is marked as a failed delete
> 
>
> Key: HUDI-7655
> URL: https://issues.apache.org/jira/browse/HUDI-7655
> Project: Apache Hudi
>  Issue Type: Improvement
>Reporter: Krishen Bhan
>Priority: Minor
>  Labels: clean
>
> When a HUDI clean plan is executed, any targeted file that was not confirmed 
> as deleted (or non-existing) will be marked as a "failed delete". Although 
> these failed deletes will be added to `.clean` metadata, if incremental clean 
> is used then these files might not ever be picked up again as a future clean 
> plan, unless a "full-scan" clean ends up being scheduled. In addition to 
> leading to more files unnecessarily taking up storage space for longer, then 
> can lead to the following dataset consistency issue for COW 

[jira] [Created] (HUDI-7655) Support configuration for clean to fail execution if there is at least one file is marked as a failed delete

2024-04-23 Thread Krishen Bhan (Jira)
Krishen Bhan created HUDI-7655:
--

 Summary: Support configuration for clean to fail execution if 
there is at least one file is marked as a failed delete
 Key: HUDI-7655
 URL: https://issues.apache.org/jira/browse/HUDI-7655
 Project: Apache Hudi
  Issue Type: Improvement
Reporter: Krishen Bhan


When a HUDI clean plan is executed, any targeted file that was not confirmed as 
deleted (or non-existing) will be marked as a "failed delete". Although these 
failed deletes will be added to `.clean` metadata, if incremental clean is used 
then these files might not ever be picked up again as a future clean plan, 
unless a "full-scan" clean ends up being scheduled. In addition to leading to 
more files unnecessarily taking up storage space for longer, then can lead to 
the following dataset consistency issue for COW datasets:
 # Insert at C1 creates file group f1 in partition
 # Replacecommit at RC2 creates file group f2 in partition, and replaces f1
 # Any reader of partition that calls HUDI API (with or without using MDT) will 
recognize that f1 should be ignored, as it has been replaced. This is since RC2 
instant file is in active timeline
 # Some completed instants later an incremental clean is scheduled. It moves 
the "earliest commit to retain" to an time after instant time RC2, so it 
targets f1 for deletion. But during execution of the plan, it fails to delete 
f1.
 # An archive job eventually is triggered, and archives C1. Note that f1 is 
still in partition

At this point, any job/query that reads the aforementioned partition directly 
from the DFS file system calls (without directly using MDT FILES partition) 
will consider both f1 and f2 as valid file groups, since RC2 is no longer in 
active timeline. This is a data consistency issue, and will only be resolved if 
a "full-scan" clean is triggered and deletes f1.

This specific scenario can be avoided if the user can configure HUDI clean to 
fail execution of a clean plan unless all files are confirmed as deleted (or 
not existing in DFS already), "blocking" the clean. The next clean attempt will 
re-execute this existing plan, since clean plans cannot be "rolled back". 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (HUDI-7507) ongoing concurrent writers with smaller timestamp can cause issues with table services

2024-04-19 Thread Krishen Bhan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-7507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Krishen Bhan updated HUDI-7507:
---
Description: 
*Scenarios:*

Although HUDI operations hold a table lock when creating a .requested instant, 
because HUDI writers do not generate a timestamp and create a .requsted plan in 
the same transaction, there can be a scenario where 
 # Job 1 starts, chooses timestamp (x) , Job 2 starts and chooses timestamp (x 
- 1)
 # Job 1 schedules and creates requested file with instant timestamp (x)
 # Job 2 schedules and creates requested file with instant timestamp (x-1)
 # Both jobs continue running

If one job is writing a commit and the other is a table service, this can cause 
issues:
 * 
 ** If Job 2 is ingestion commit and Job 1 is compaction/log compaction, then 
when Job 1 runs before Job 2 and can create a compaction plan for all instant 
times (up to (x) ) that doesn’t include instant time (x-1) .  Later Job 2 will 
create instant time (x-1), but timeline will be in a corrupted state since 
compaction plan was supposed to include (x-1)
 ** There is a similar issue with clean. If Job2 is a long-running commit (that 
was stuck/delayed for a while before creating its .requested plan) and Job 1 is 
a clean, then Job 1 can perform a clean that updates the 
earliest-commit-to-retain without waiting for the inflight instant by Job 2 at 
(x-1) to complete. This causes Job2 to be "skipped" by clean.

[Edit] I added a diagram to visualize the issue, specifically the second 
scenario with clean

!Flowchart (1).png!

*Proposed approach:*

One way this can be resolved is by combining the operations of generating 
instant time and creating a requested file in the same HUDI table transaction. 
Specifically, executing the following steps whenever any instant (commit, table 
service, etc) is scheduled

Approach A
 # Acquire table lock
 # Look at the latest instant C on the active timeline (completed or not). 
Generate a timestamp after C
 # Create the plan and requested file using this new timestamp ( that is 
greater than C)
 # Release table lock

Unfortunately (A) has the following drawbacks
 * Every operation must now hold the table lock when computing its plan even if 
it's an expensive operation and will take a while
 * Users of HUDI cannot easily set their own instant time of an operation, and 
this restriction would break any public APIs that allow this and would require 
deprecating those APIs.

 

An alternate approach is to have every operation abort creating a .requested 
file unless it has the latest timestamp. Specifically, for any instant type, 
whenever an operation is about to create a .requested plan on timeline, it 
should take the table lock and assert that there are no other instants on 
timeline that are greater than it that could cause a conflict. If that 
assertion fails, then throw a retry-able conflict resolution exception.

Specifically, the following steps should be followed whenever any instant 
(commit, table service, etc) is scheduled

Approach B
 # Acquire table lock. Assume that the desired instant time C and requested 
file plan metadata have already been created, regardless of wether it was 
before this step or right after acquiring the table lock.
 # Get the set of all instants on the timeline that are greater than C 
(regardless of their action or sate status). 
 ## If the current operation is an ingestion type 
(commit/deltacommit/insert_overwrite replace) then assert the set is empty
 ## If the current operation is a table service then assert that the set 
doesn't contain any table service instant types
 # Create requested plan on timeline (As usual)
 # Release table

Unlike (A), this approach (B) allows users to continue to use HUDI APIs where 
caller can specify instant time (preventing the need from deprecating any 
public API). It also allows the possibility of table service operations 
computing their plan without holding a lock. Despite this though, (B) has 
following drawbacks
 * It is not immediately clear how MDT vs base table operations should be 
handled here. At first glance it seems that at step (2) both the base table and 
MDT timeline should be checked, but that might need more investigation to 
confirm.
 * This error will still be thrown even for combinations of concurrent 
operations where it would be safe to continue. For example, assume two 
ingestion writers being executing on a dataset, with each only performing a 
insert commit on the dataset (with no table service being scheduled). If the 
writer that started scheduling later ending up having an earlier timestamp, it 
would still be safe for it to continue. Despite that, because of step (2.1)  it 
would still have to abort an throw an error. This means that on datasets with 
many frequent concurrent ingestion commits and very infrequent table service 
operations, there would be a lot of transient failures/noise 

[jira] [Updated] (HUDI-7507) ongoing concurrent writers with smaller timestamp can cause issues with table services

2024-04-05 Thread Krishen Bhan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-7507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Krishen Bhan updated HUDI-7507:
---
Description: 
*Scenarios:*

Although HUDI operations hold a table lock when creating a .requested instant, 
because HUDI writers do not generate a timestamp and create a .requsted plan in 
the same transaction, there can be a scenario where 
 # Job 1 starts, chooses timestamp (x) , Job 2 starts and chooses timestamp (x 
- 1)
 # Job 1 schedules and creates requested file with instant timestamp (x)
 # Job 2 schedules and creates requested file with instant timestamp (x-1)
 # Both jobs continue running

If one job is writing a commit and the other is a table service, this can cause 
issues:
 * 
 ** If Job 2 is ingestion commit and Job 1 is compaction/log compaction, then 
when Job 1 runs before Job 2 and can create a compaction plan for all instant 
times (up to (x) ) that doesn’t include instant time (x-1) .  Later Job 2 will 
create instant time (x-1), but timeline will be in a corrupted state since 
compaction plan was supposed to include (x-1)
 ** There is a similar issue with clean. If Job2 is a long-running commit (that 
was stuck/delayed for a while before creating its .requested plan) and Job 1 is 
a clean, then Job 1 can perform a clean that updates the 
earliest-commit-to-retain without waiting for the inflight instant by Job 2 at 
(x-1) to complete. This causes Job2 to be "skipped" by clean.

[Edit] I added a diagram to visualize the issue, specifically the second 
scenario with clean

!Flowchart (1).png!

*Proposed approach:*

One way this can be resolved is by combining the operations of generating 
instant time and creating a requested file in the same HUDI table transaction. 
Specifically, executing the following steps whenever any instant (commit, table 
service, etc) is scheduled

Approach A
 # Acquire table lock
 # Look at the latest instant C on the active timeline (completed or not). 
Generate a timestamp after C
 # Create the plan and requested file using this new timestamp ( that is 
greater than C)
 # Release table lock

Unfortunately (A) has the following drawbacks
 * Every operation must now hold the table lock when computing its plan even if 
it's an expensive operation and will take a while
 * Users of HUDI cannot easily set their own instant time of an operation, and 
this restriction would break any public APIs that allow this and would require 
deprecating those APIs.

 

An alternate approach is to have every operation abort creating a .requested 
file unless it has the latest timestamp. Specifically, for any instant type, 
whenever an operation is about to create a .requested plan on timeline, it 
should take the table lock and assert that there are no other instants on 
timeline (inflight or otherwise) that are greater than it. If that assertion 
fails, then throw a retry-able conflict resolution exception.

Specifically, the following steps should be followed whenever any instant 
(commit, table service, etc) is scheduled

Approach B
 # Acquire table lock. Assume that the desired instant time C and requested 
file plan metadata have already been created, regardless of wether it was 
before this step or right after acquiring the table lock.
 # Check if there are any instant files on timeline greater than C (regardless 
of their action or sate status). If so raise a custom exception
 # Create requested plan on timeline (As usual)
 # Release table

Unlike (A), this approach (B) allows users to continue to use HUDI APIs where 
caller can specify instant time (preventing the need from deprecating any 
public API). It also allows the possibility of table service operations 
computing their plan without holding a lock. Despite this though, (B) has 
following drawbacks
 * It is not immediately clear how MDT vs base table operations should be 
handled here. At first glance it seems that at step (2) both the base table and 
MDT timeline should be checked, but that might need more investigation to 
confirm.
 * This error will still be thrown even for combinations of concurrent 
operations where it would be safe to continue. For example, assume two 
ingestion writers being executing on a dataset, with each only performing a 
insert commit on the dataset (with no table service being scheduled). If the 
writer that started scheduling later ending up having an earlier timestamp, it 
would still be safe for it to continue. Despite that, because of step (2)  it 
would still have to abort an throw an error. This means that on datasets with 
many frequent concurrent ingestion commits and very infrequent table service 
operations, there would be a lot of transient failures/noise by failing 
writers. This step (2) could potentially be revised to avoid this scenario (by 
only checking for certain actions like table services) but that would add 
complexity and it is not clear at first glance if that would 

[jira] [Updated] (HUDI-7507) ongoing concurrent writers with smaller timestamp can cause issues with table services

2024-04-05 Thread Krishen Bhan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-7507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Krishen Bhan updated HUDI-7507:
---
Description: 
*Scenarios:*

Although HUDI operations hold a table lock when creating a .requested instant, 
because HUDI writers do not generate a timestamp and create a .requsted plan in 
the same transaction, there can be a scenario where 
 # Job 1 starts, chooses timestamp (x) , Job 2 starts and chooses timestamp (x 
- 1)
 # Job 1 schedules and creates requested file with instant timestamp (x)
 # Job 2 schedules and creates requested file with instant timestamp (x-1)
 # Both jobs continue running

If one job is writing a commit and the other is a table service, this can cause 
issues:
 * 
 ** If Job 2 is ingestion commit and Job 1 is compaction/log compaction, then 
when Job 1 runs before Job 2 and can create a compaction plan for all instant 
times (up to (x) ) that doesn’t include instant time (x-1) .  Later Job 2 will 
create instant time (x-1), but timeline will be in a corrupted state since 
compaction plan was supposed to include (x-1)
 ** There is a similar issue with clean. If Job2 is a long-running commit (that 
was stuck/delayed for a while before creating its .requested plan) and Job 1 is 
a clean, then Job 1 can perform a clean that updates the 
earliest-commit-to-retain without waiting for the inflight instant by Job 2 at 
(x-1) to complete. This causes Job2 to be "skipped" by clean.

[Edit] I added a diagram to visualize the issue, specifically the second 
scenario with clean

!Flowchart (1).png!

*Proposed approach:*

One way this can be resolved is by combining the operations of generating 
instant time and creating a requested file in the same HUDI table transaction. 
Specifically, executing the following steps whenever any instant (commit, table 
service, etc) is scheduled

Approach A
 # Acquire table lock
 # Look at the latest instant C on the active timeline (completed or not). 
Generate a timestamp after C
 # Create the plan and requested file using this new timestamp ( that is 
greater than C)
 # Release table lock

Unfortunately (A) has the following drawbacks
 * Every operation must now hold the table lock when computing its plan even if 
it's an expensive operation and will take a while
 * Users of HUDI cannot easily set their own instant time of an operation, and 
this restriction would break any public APIs that allow this and would require 
deprecating those APIs.

 

An alternate approach is to have every operation abort creating a .requested 
file unless it has the latest timestamp. Specifically, for any instant type, 
whenever an operation is about to create a .requested plan on timeline, it 
should take the table lock and assert that there are no other instants on 
timeline (inflight or otherwise) that are greater than it. If that assertion 
fails, then throw a retry-able conflict resolution exception.

Specifically, the following steps should be followed whenever any instant 
(commit, table service, etc) is scheduled

Approach B
 # Acquire table lock. Assume that the desired instant time C and requested 
file plan metadata have already been created, regardless of wether it was 
before this step or right after acquiring the table lock.
 # Check if there are any instant files on timeline greater than C (regardless 
of their action or sate status). If so raise a custom exception
 # Create requested plan on timeline (As usual)
 # Release table

Unlike (A), this approach (B) allows users to continue to use HUDI APIs where 
caller can specify instant time (preventing the need from deprecating any 
public API). It also allows the possibility of table service operations 
computing their plan without holding a lock. Despite this though, (B) has 
following drawbacks
 * It is not immediately clear how MDT vs base table operations should be 
handled here. At first glance it seems that at step (2) both the base table and 
MDT timeline should be checked, but that might need more investigation to 
confirm.
 * This error will still be thrown even for combinations of concurrent 
operations where it would be safe to continue. For example, assume two 
ingestion writers being executing on a dataset, with each only performing a 
insert commit on the dataset (with no table service being scheduled). If the 
writer that started scheduling later ending up having an earlier timestamp, it 
would still be safe for it to continue. Despite that, because of step (2)  it 
would still have to abort an throw an error. This means that on datasets with 
many frequent concurrent ingestion commits and very infrequent table service 
operations, there would be a lot of transient failures/noise by failing 
writers. This step (2) could potentially be revised to avoid this scenario (by 
only checking for certain actions like table services) but that would add 
complexity and it is not clear at first glance if that would 

[jira] [Updated] (HUDI-7503) concurrent executions of table service plan should not corrupt dataset

2024-04-02 Thread Krishen Bhan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-7503?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Krishen Bhan updated HUDI-7503:
---
Summary: concurrent executions of table service plan should not corrupt 
dataset  (was: concurrent executions of compaction plan should not corrupt 
dataset)

> concurrent executions of table service plan should not corrupt dataset
> --
>
> Key: HUDI-7503
> URL: https://issues.apache.org/jira/browse/HUDI-7503
> Project: Apache Hudi
>  Issue Type: Improvement
>  Components: compaction, table-service
>Reporter: Krishen Bhan
>Priority: Minor
>
> Currently it is not safe for 2+ writers to concurrently call 
> `org.apache.hudi.client.BaseHoodieTableServiceClient#compact` on the same 
> compaction instant. This is since one writer might execute the instant and 
> create an inflight, while the other writer sees the inflight and tries to 
> roll it back before re-attempting to execute it (since it will assume said 
> inflight was a previously failed compaction attempt).
> This logic should be updated such that only one writer will actually execute 
> the compaction plan at a time (and the others will fail/abort).
> One approach is to use a transaction (base table lock) in conjunction with 
> heartbeating, to ensure that the writer triggers a heartbeat before executing 
> compaction, and any concurrent writers will use the heartbeat to check wether 
> the compaction is currently being executed by another writer. Specifically , 
> the compact API should execute the following steps
>  # Get the instant to compact C (as usual)
>  # Start a transaction
>  # Checks if C has an active heartbeat, if so finish transaction and throw 
> exception
>  # Start a heartbeat for C (this will implicitly re-start the heartbeat if it 
> has been started before by another job)
>  # Finish transaction
>  # Run the existing compact API logic on C 
>  # If execution succeeds, clean up heartbeat file . If it fails do nothing 
> (as the heartbeat will anyway be automatically expired later).
> Note that this approach only holds the table lock temporarily, when 
> checking/starting the heartbeat
> Also, this flow can be applied to execution of clean plans and other table 
> services



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (HUDI-7507) ongoing concurrent writers with smaller timestamp can cause issues with table services

2024-03-29 Thread Krishen Bhan (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-7507?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17832306#comment-17832306
 ] 

Krishen Bhan commented on HUDI-7507:


I removed the original alternate approach I added in ticket description since 
it can still result in problematic edge cases.
{code:java}
An alternate approach (suggested by Prashant Wason ) was to instead have all 
operations including table services perform conflict resolution checks before 
committing. For example, clean and compaction would generate their plan as 
usual. But when creating a transaction to write a .requested file, right before 
creating the file they should check if another lower timestamp instant has 
appeared in the timeline. And if so, they should fail/abort without creating 
the plan. Commit operations would also be updated/verified to have similar 
check, before creating a .requested file (during a transaction) the commit 
operation will check if a table service plan (clean/compact) with a greater 
instant time has been created. And if so, would abort/fail. This avoids the 
drawbacks of the first approach, but will lead to more transient failures that 
users have to handle. {code}
Specifically, it can still result in an inflight commit on base table failing 
while a completed compaction on MDT with a greater timestamp is present.

>  ongoing concurrent writers with smaller timestamp can cause issues with 
> table services
> ---
>
> Key: HUDI-7507
> URL: https://issues.apache.org/jira/browse/HUDI-7507
> Project: Apache Hudi
>  Issue Type: Improvement
>  Components: table-service
>Reporter: Krishen Bhan
>Priority: Major
> Attachments: Flowchart (1).png, Flowchart.png
>
>
> Although HUDI operations hold a table lock when creating a .requested 
> instant, because HUDI writers do not generate a timestamp and create a 
> .requsted plan in the same transaction, there can be a scenario where 
>  # Job 1 starts, chooses timestamp (x) , Job 2 starts and chooses timestamp 
> (x - 1)
>  # Job 1 schedules and creates requested file with instant timestamp (x)
>  # Job 2 schedules and creates requested file with instant timestamp (x-1)
>  # Both jobs continue running
> If one job is writing a commit and the other is a table service, this can 
> cause issues:
>  * 
>  ** If Job 2 is ingestion commit and Job 1 is compaction/log compaction, then 
> when Job 1 runs before Job 2 and can create a compaction plan for all instant 
> times (up to (x) ) that doesn’t include instant time (x-1) .  Later Job 2 
> will create instant time (x-1), but timeline will be in a corrupted state 
> since compaction plan was supposed to include (x-1)
>  ** There is a similar issue with clean. If Job2 is a long-running commit 
> (that was stuck/delayed for a while before creating its .requested plan) and 
> Job 1 is a clean, then Job 1 can perform a clean that updates the 
> earliest-commit-to-retain without waiting for the inflight instant by Job 2 
> at (x-1) to complete. This causes Job2 to be "skipped" by clean.
> [Edit] I added a diagram to visualize the issue, specifically the second 
> scenario with clean
> !Flowchart (1).png!
>  
> One way this can be resolved is by combining the operations of generating 
> instant time and creating a requested file in the same HUDI table 
> transaction. Specifically, executing the following steps whenever any instant 
> (commit, table service, etc) is scheduled
>  # Acquire table lock
>  # Look at the latest instant C on the active timeline (completed or not). 
> Generate a timestamp after C
>  # Create the plan and requested file using this new timestamp ( that is 
> greater than C)
>  # Release table lock
> Unfortunately this has the following drawbacks
>  * Every operation must now hold the table lock when computing its plan, even 
> if its an expensive operation and will take a while
>  * Users of HUDI cannot easily set their own instant time of an operation, 
> and this restriction would break any public APIs that allow this
> -An alternate approach (suggested by- [~pwason] -) was to instead have all 
> operations including table services perform conflict resolution checks before 
> committing. For example, clean and compaction would generate their plan as 
> usual. But when creating a transaction to write a .requested file, right 
> before creating the file they should check if another lower timestamp instant 
> has appeared in the timeline. And if so, they should fail/abort without 
> creating the plan. Commit operations would also be updated/verified to have 
> similar check, before creating a .requested file (during a transaction) the 
> commit operation will check if a table service plan (clean/compact) with a 
> greater instant time has been created. And if so, 

[jira] [Updated] (HUDI-7507) ongoing concurrent writers with smaller timestamp can cause issues with table services

2024-03-29 Thread Krishen Bhan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-7507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Krishen Bhan updated HUDI-7507:
---
Description: 
Although HUDI operations hold a table lock when creating a .requested instant, 
because HUDI writers do not generate a timestamp and create a .requsted plan in 
the same transaction, there can be a scenario where 
 # Job 1 starts, chooses timestamp (x) , Job 2 starts and chooses timestamp (x 
- 1)
 # Job 1 schedules and creates requested file with instant timestamp (x)
 # Job 2 schedules and creates requested file with instant timestamp (x-1)
 # Both jobs continue running

If one job is writing a commit and the other is a table service, this can cause 
issues:
 * 
 ** If Job 2 is ingestion commit and Job 1 is compaction/log compaction, then 
when Job 1 runs before Job 2 and can create a compaction plan for all instant 
times (up to (x) ) that doesn’t include instant time (x-1) .  Later Job 2 will 
create instant time (x-1), but timeline will be in a corrupted state since 
compaction plan was supposed to include (x-1)
 ** There is a similar issue with clean. If Job2 is a long-running commit (that 
was stuck/delayed for a while before creating its .requested plan) and Job 1 is 
a clean, then Job 1 can perform a clean that updates the 
earliest-commit-to-retain without waiting for the inflight instant by Job 2 at 
(x-1) to complete. This causes Job2 to be "skipped" by clean.

[Edit] I added a diagram to visualize the issue, specifically the second 
scenario with clean

!Flowchart (1).png!

 

One way this can be resolved is by combining the operations of generating 
instant time and creating a requested file in the same HUDI table transaction. 
Specifically, executing the following steps whenever any instant (commit, table 
service, etc) is scheduled
 # Acquire table lock
 # Look at the latest instant C on the active timeline (completed or not). 
Generate a timestamp after C
 # Create the plan and requested file using this new timestamp ( that is 
greater than C)
 # Release table lock

Unfortunately this has the following drawbacks
 * Every operation must now hold the table lock when computing its plan, even 
if its an expensive operation and will take a while
 * Users of HUDI cannot easily set their own instant time of an operation, and 
this restriction would break any public APIs that allow this

-An alternate approach (suggested by- [~pwason] -) was to instead have all 
operations including table services perform conflict resolution checks before 
committing. For example, clean and compaction would generate their plan as 
usual. But when creating a transaction to write a .requested file, right before 
creating the file they should check if another lower timestamp instant has 
appeared in the timeline. And if so, they should fail/abort without creating 
the plan. Commit operations would also be updated/verified to have similar 
check, before creating a .requested file (during a transaction) the commit 
operation will check if a table service plan (clean/compact) with a greater 
instant time has been created. And if so, would abort/fail. This avoids the 
drawbacks of the first approach, but will lead to more transient failures that 
users have to handle.-

 

An alternate approach is to have every operation abort creating a .requested 
file unless it has the latest timestamp. Specifically, for any instant type, 
whenever an operation is about to create a .requested plan on timeline, it 
should take the table lock and assert that there are no other instants on 
timeline (inflight or otherwise) that are greater than it. If that assertion 
fails, then throw a retry-able conflict resolution exception.

  was:
Although HUDI operations hold a table lock when creating a .requested instant, 
because HUDI writers do not generate a timestamp and create a .requsted plan in 
the same transaction, there can be a scenario where 
 # Job 1 starts, chooses timestamp (x) , Job 2 starts and chooses timestamp (x 
- 1)
 # Job 1 schedules and creates requested file with instant timestamp (x)
 # Job 2 schedules and creates requested file with instant timestamp (x-1)
 # Both jobs continue running

If one job is writing a commit and the other is a table service, this can cause 
issues:
 * 
 ** If Job 2 is ingestion commit and Job 1 is compaction/log compaction, then 
when Job 1 runs before Job 2 and can create a compaction plan for all instant 
times (up to (x) ) that doesn’t include instant time (x-1) .  Later Job 2 will 
create instant time (x-1), but timeline will be in a corrupted state since 
compaction plan was supposed to include (x-1)
 ** There is a similar issue with clean. If Job2 is a long-running commit (that 
was stuck/delayed for a while before creating its .requested plan) and Job 1 is 
a clean, then Job 1 can perform a clean that updates the 
earliest-commit-to-retain without waiting for the inflight instant 

[jira] [Updated] (HUDI-7507) ongoing concurrent writers with smaller timestamp can cause issues with table services

2024-03-26 Thread Krishen Bhan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-7507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Krishen Bhan updated HUDI-7507:
---
Description: 
Although HUDI operations hold a table lock when creating a .requested instant, 
because HUDI writers do not generate a timestamp and create a .requsted plan in 
the same transaction, there can be a scenario where 
 # Job 1 starts, chooses timestamp (x) , Job 2 starts and chooses timestamp (x 
- 1)
 # Job 1 schedules and creates requested file with instant timestamp (x)
 # Job 2 schedules and creates requested file with instant timestamp (x-1)
 # Both jobs continue running

If one job is writing a commit and the other is a table service, this can cause 
issues:
 * 
 ** If Job 2 is ingestion commit and Job 1 is compaction/log compaction, then 
when Job 1 runs before Job 2 and can create a compaction plan for all instant 
times (up to (x) ) that doesn’t include instant time (x-1) .  Later Job 2 will 
create instant time (x-1), but timeline will be in a corrupted state since 
compaction plan was supposed to include (x-1)
 ** There is a similar issue with clean. If Job2 is a long-running commit (that 
was stuck/delayed for a while before creating its .requested plan) and Job 1 is 
a clean, then Job 1 can perform a clean that updates the 
earliest-commit-to-retain without waiting for the inflight instant by Job 2 at 
(x-1) to complete. This causes Job2 to be "skipped" by clean.

[Edit] I added a diagram to visualize the issue, specifically the second 
scenario with clean

!Flowchart (1).png!

 

One way this can be resolved is by combining the operations of generating 
instant time and creating a requested file in the same HUDI table transaction. 
Specifically, executing the following steps whenever any instant (commit, table 
service, etc) is scheduled
 # Acquire table lock
 # Look at the latest instant C on the active timeline (completed or not). 
Generate a timestamp after C
 # Create the plan and requested file using this new timestamp ( that is 
greater than C)
 # Release table lock

Unfortunately this has the following drawbacks
 * Every operation must now hold the table lock when computing its plan, even 
if its an expensive operation and will take a while
 * Users of HUDI cannot easily set their own instant time of an operation, and 
this restriction would break any public APIs that allow this

An alternate approach (suggested by [~pwason] ) was to instead have all 
operations including table services perform conflict resolution checks before 
committing. For example, clean and compaction would generate their plan as 
usual. But when creating a transaction to write a .requested file, right before 
creating the file they should check if another lower timestamp instant has 
appeared in the timeline. And if so, they should fail/abort without creating 
the plan. Commit operations would also be updated/verified to have similar 
check, before creating a .requested file (during a transaction) the commit 
operation will check if a table service plan (clean/compact) with a greater 
instant time has been created. And if so, would abort/fail. This avoids the 
drawbacks of the first approach, but will lead to more transient failures that 
users have to handle.

  was:
Although HUDI operations hold a table lock when creating a .requested instant, 
because HUDI writers do not generate a timestamp and create a .requsted plan in 
the same transaction, there can be a scenario where 
 # Job 1 starts, chooses timestamp (x) , Job 2 starts and chooses timestamp (x 
- 1)
 # Job 1 schedules and creates requested file with instant timestamp (x)
 # Job 2 schedules and creates requested file with instant timestamp (x-1)
 # Both jobs continue running

If one job is writing a commit and the other is a table service, this can cause 
issues:
 * 
 ** If Job 2 is ingestion commit and Job 1 is compaction/log compaction, then 
when Job 1 runs before Job 2 and can create a compaction plan for all instant 
times (up to (x) ) that doesn’t include instant time (x-1) .  Later Job 2 will 
create instant time (x-1), but timeline will be in a corrupted state since 
compaction plan was supposed to include (x-1)
 ** There is a similar issue with clean. If Job2 is a long-running commit (that 
was stuck/delayed for a while before creating its .requested plan) and Job 1 is 
a clean, then Job 1 can perform a clean that updates the 
earliest-commit-to-retain without waiting for the inflight instant by Job 2 at 
(x-1) to complete. This causes Job2 to be "skipped" by clean.

[Edit] I added a diagram to visualize the issue, specifically the second 
scenario with clean

!Flowchart.png!

 

One way this can be resolved is by combining the operations of generating 
instant time and creating a requested file in the same HUDI table transaction. 
Specifically, executing the following steps whenever any instant (commit, table 
service, etc) is scheduled
 

[jira] [Updated] (HUDI-7507) ongoing concurrent writers with smaller timestamp can cause issues with table services

2024-03-26 Thread Krishen Bhan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-7507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Krishen Bhan updated HUDI-7507:
---
Attachment: Flowchart (1).png

>  ongoing concurrent writers with smaller timestamp can cause issues with 
> table services
> ---
>
> Key: HUDI-7507
> URL: https://issues.apache.org/jira/browse/HUDI-7507
> Project: Apache Hudi
>  Issue Type: Improvement
>Reporter: Krishen Bhan
>Priority: Major
> Attachments: Flowchart (1).png, Flowchart.png
>
>
> Although HUDI operations hold a table lock when creating a .requested 
> instant, because HUDI writers do not generate a timestamp and create a 
> .requsted plan in the same transaction, there can be a scenario where 
>  # Job 1 starts, chooses timestamp (x) , Job 2 starts and chooses timestamp 
> (x - 1)
>  # Job 1 schedules and creates requested file with instant timestamp (x)
>  # Job 2 schedules and creates requested file with instant timestamp (x-1)
>  # Both jobs continue running
> If one job is writing a commit and the other is a table service, this can 
> cause issues:
>  * 
>  ** If Job 2 is ingestion commit and Job 1 is compaction/log compaction, then 
> when Job 1 runs before Job 2 and can create a compaction plan for all instant 
> times (up to (x) ) that doesn’t include instant time (x-1) .  Later Job 2 
> will create instant time (x-1), but timeline will be in a corrupted state 
> since compaction plan was supposed to include (x-1)
>  ** There is a similar issue with clean. If Job2 is a long-running commit 
> (that was stuck/delayed for a while before creating its .requested plan) and 
> Job 1 is a clean, then Job 1 can perform a clean that updates the 
> earliest-commit-to-retain without waiting for the inflight instant by Job 2 
> at (x-1) to complete. This causes Job2 to be "skipped" by clean.
> [Edit] I added a diagram to visualize the issue, specifically the second 
> scenario with clean
> !Flowchart.png!
>  
> One way this can be resolved is by combining the operations of generating 
> instant time and creating a requested file in the same HUDI table 
> transaction. Specifically, executing the following steps whenever any instant 
> (commit, table service, etc) is scheduled
>  # Acquire table lock
>  # Look at the latest instant C on the active timeline (completed or not). 
> Generate a timestamp after C
>  # Create the plan and requested file using this new timestamp ( that is 
> greater than C)
>  # Release table lock
> Unfortunately this has the following drawbacks
>  * Every operation must now hold the table lock when computing its plan, even 
> if its an expensive operation and will take a while
>  * Users of HUDI cannot easily set their own instant time of an operation, 
> and this restriction would break any public APIs that allow this
> An alternate approach (suggested by [~pwason] ) was to instead have all 
> operations including table services perform conflict resolution checks before 
> committing. For example, clean and compaction would generate their plan as 
> usual. But when creating a transaction to write a .requested file, right 
> before creating the file they should check if another lower timestamp instant 
> has appeared in the timeline. And if so, they should fail/abort without 
> creating the plan. Commit operations would also be updated/verified to have 
> similar check, before creating a .requested file (during a transaction) the 
> commit operation will check if a table service plan (clean/compact) with a 
> greater instant time has been created. And if so, would abort/fail. This 
> avoids the drawbacks of the first approach, but will lead to more transient 
> failures that users have to handle.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (HUDI-7507) ongoing concurrent writers with smaller timestamp can cause issues with table services

2024-03-25 Thread Krishen Bhan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-7507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Krishen Bhan updated HUDI-7507:
---
 Attachment: Flowchart.png
Description: 
Although HUDI operations hold a table lock when creating a .requested instant, 
because HUDI writers do not generate a timestamp and create a .requsted plan in 
the same transaction, there can be a scenario where 
 # Job 1 starts, chooses timestamp (x) , Job 2 starts and chooses timestamp (x 
- 1)
 # Job 1 schedules and creates requested file with instant timestamp (x)
 # Job 2 schedules and creates requested file with instant timestamp (x-1)
 # Both jobs continue running

If one job is writing a commit and the other is a table service, this can cause 
issues:
 * 
 ** If Job 2 is ingestion commit and Job 1 is compaction/log compaction, then 
when Job 1 runs before Job 2 and can create a compaction plan for all instant 
times (up to (x) ) that doesn’t include instant time (x-1) .  Later Job 2 will 
create instant time (x-1), but timeline will be in a corrupted state since 
compaction plan was supposed to include (x-1)
 ** There is a similar issue with clean. If Job2 is a long-running commit (that 
was stuck/delayed for a while before creating its .requested plan) and Job 1 is 
a clean, then Job 1 can perform a clean that updates the 
earliest-commit-to-retain without waiting for the inflight instant by Job 2 at 
(x-1) to complete. This causes Job2 to be "skipped" by clean.

[Edit] I added a diagram to visualize the issue, specifically the second 
scenario with clean

!Flowchart.png!

 

One way this can be resolved is by combining the operations of generating 
instant time and creating a requested file in the same HUDI table transaction. 
Specifically, executing the following steps whenever any instant (commit, table 
service, etc) is scheduled
 # Acquire table lock
 # Look at the latest instant C on the active timeline (completed or not). 
Generate a timestamp after C
 # Create the plan and requested file using this new timestamp ( that is 
greater than C)
 # Release table lock

Unfortunately this has the following drawbacks
 * Every operation must now hold the table lock when computing its plan, even 
if its an expensive operation and will take a while
 * Users of HUDI cannot easily set their own instant time of an operation, and 
this restriction would break any public APIs that allow this

An alternate approach (suggested by [~pwason] ) was to instead have all 
operations including table services perform conflict resolution checks before 
committing. For example, clean and compaction would generate their plan as 
usual. But when creating a transaction to write a .requested file, right before 
creating the file they should check if another lower timestamp instant has 
appeared in the timeline. And if so, they should fail/abort without creating 
the plan. Commit operations would also be updated/verified to have similar 
check, before creating a .requested file (during a transaction) the commit 
operation will check if a table service plan (clean/compact) with a greater 
instant time has been created. And if so, would abort/fail. This avoids the 
drawbacks of the first approach, but will lead to more transient failures that 
users have to handle.

  was:
Although HUDI operations hold a table lock when creating a .requested instant, 
because HUDI writers do not generate a timestamp and create a .requsted plan in 
the same transaction, there can be a scenario where 
 # Job 1 starts, chooses timestamp (x) , Job 2 starts and chooses timestamp (x 
- 1)
 # Job 1 schedules and creates requested file with instant timestamp (x)
 # Job 2 schedules and creates requested file with instant timestamp (x-1)
 # Both jobs continue running

If one job is writing a commit and the other is a table service, this can cause 
issues:
 * 
 ** If Job 2 is ingestion commit and Job 1 is compaction/log compaction, then 
when Job 1 runs before Job 2 and can create a compaction plan for all instant 
times (up to (x) ) that doesn’t include instant time (x-1) .  Later Job 2 will 
create instant time (x-1), but timeline will be in a corrupted state since 
compaction plan was supposed to include (x-1)
 ** There is a similar issue with clean. If Job2 is a long-running commit (that 
was stuck/delayed for a while before creating its .requested plan) and Job 1 is 
a clean, then Job 1 can perform a clean that updates the 
earliest-commit-to-retain without waiting for the inflight instant by Job 2 at 
(x-1) to complete. This causes Job2 to be "skipped" by clean.

One way this can be resolved is by combining the operations of generating 
instant time and creating a requested file in the same HUDI table transaction. 
Specifically, executing the following steps whenever any instant (commit, table 
service, etc) is scheduled
 # Acquire table lock
 # Look at the latest instant C on the active timeline (completed 

[jira] [Created] (HUDI-7507) ongoing concurrent writers with smaller timestamp can cause issues with table services

2024-03-14 Thread Krishen Bhan (Jira)
Krishen Bhan created HUDI-7507:
--

 Summary:  ongoing concurrent writers with smaller timestamp can 
cause issues with table services
 Key: HUDI-7507
 URL: https://issues.apache.org/jira/browse/HUDI-7507
 Project: Apache Hudi
  Issue Type: Improvement
Reporter: Krishen Bhan


Although HUDI operations hold a table lock when creating a .requested instant, 
because HUDI writers do not generate a timestamp and create a .requsted plan in 
the same transaction, there can be a scenario where 
 # Job 1 starts, chooses timestamp (x) , Job 2 starts and chooses timestamp (x 
- 1)
 # Job 1 schedules and creates requested file with instant timestamp (x)
 # Job 2 schedules and creates requested file with instant timestamp (x-1)
 # Both jobs continue running

If one job is writing a commit and the other is a table service, this can cause 
issues:
 * 
 ** If Job 2 is ingestion commit and Job 1 is compaction/log compaction, then 
when Job 1 runs before Job 2 and can create a compaction plan for all instant 
times (up to (x) ) that doesn’t include instant time (x-1) .  Later Job 2 will 
create instant time (x-1), but timeline will be in a corrupted state since 
compaction plan was supposed to include (x-1)
 ** There is a similar issue with clean. If Job2 is a long-running commit (that 
was stuck/delayed for a while before creating its .requested plan) and Job 1 is 
a clean, then Job 1 can perform a clean that updates the 
earliest-commit-to-retain without waiting for the inflight instant by Job 2 at 
(x-1) to complete. This causes Job2 to be "skipped" by clean.

One way this can be resolved is by combining the operations of generating 
instant time and creating a requested file in the same HUDI table transaction. 
Specifically, executing the following steps whenever any instant (commit, table 
service, etc) is scheduled
 # Acquire table lock
 # Look at the latest instant C on the active timeline (completed or not). 
Generate a timestamp after C
 # Create the plan and requested file using this new timestamp ( that is 
greater than C)
 # Release table lock

Unfortunately this has the following drawbacks
 * Every operation must now hold the table lock when computing its plan, even 
if its an expensive operation and will take a while
 * Users of HUDI cannot easily set their own instant time of an operation, and 
this restriction would break any public APIs that allow this

An alternate approach (suggested by [~pwason] ) was to instead have all 
operations including table services perform conflict resolution checks before 
committing. For example, clean and compaction would generate their plan as 
usual. But when creating a transaction to write a .requested file, right before 
creating the file they should check if another lower timestamp instant has 
appeared in the timeline. And if so, they should fail/abort without creating 
the plan. Commit operations would also be updated/verified to have similar 
check, before creating a .requested file (during a transaction) the commit 
operation will check if a table service plan (clean/compact) with a greater 
instant time has been created. And if so, would abort/fail. This avoids the 
drawbacks of the first approach, but will lead to more transient failures that 
users have to handle.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (HUDI-7503) concurrent executions of compaction plan should not corrupt dataset

2024-03-14 Thread Krishen Bhan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-7503?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Krishen Bhan updated HUDI-7503:
---
Description: 
Currently it is not safe for 2+ writers to concurrently call 
`org.apache.hudi.client.BaseHoodieTableServiceClient#compact` on the same 
compaction instant. This is since one writer might execute the instant and 
create an inflight, while the other writer sees the inflight and tries to roll 
it back before re-attempting to execute it (since it will assume said inflight 
was a previously failed compaction attempt).

This logic should be updated such that only one writer will actually execute 
the compaction plan at a time (and the others will fail/abort).

One approach is to use a transaction (base table lock) in conjunction with 
heartbeating, to ensure that the writer triggers a heartbeat before executing 
compaction, and any concurrent writers will use the heartbeat to check wether 
the compaction is currently being executed by another writer. Specifically , 
the compact API should execute the following steps
 # Get the instant to compact C (as usual)
 # Start a transaction
 # Checks if C has an active heartbeat, if so finish transaction and throw 
exception
 # Start a heartbeat for C (this will implicitly re-start the heartbeat if it 
has been started before by another job)
 # Finish transaction
 # Run the existing compact API logic on C 
 # If execution succeeds, clean up heartbeat file . If it fails do nothing (as 
the heartbeat will anyway be automatically expired later).

Note that this approach only holds the table lock temporarily, when 
checking/starting the heartbeat

Also, this flow can be applied to execution of clean plans and other table 
services

  was:
Currently it is not safe for 2+ writers to concurrently call 
`org.apache.hudi.client.BaseHoodieTableServiceClient#compact` on the same 
compaction instant. This is since one writer might execute the instant and 
create an inflight, while the other writer sees the inflight and tries to roll 
it back before re-attempting to execute it (since it will assume said inflight 
was a previously failed compaction attempt).

This logic should be updated such that only one writer will actually execute 
the compaction plan at a time (and the others will fail/abort).

One approach is to use a transaction (base table lock) in conjunction with 
heartbeating, to ensure that the writer triggers a heartbeat before executing 
compaction, and any concurrent writers will use the heartbeat to check wether 
the compaction is currently being executed by another writer. Specifically , 
the compact API should execute the following steps
 # Get the instant to compact C (as usual)
 # Start a transaction
 # Checks if C has an active heartbeat, if so finish transaction and throw 
exception
 # Start a heartbeat for C (this will implicitly re-start the heartbeat if it 
has been started before by another job)
 # Finish transaction
 # Run the existing compact API logic on C 
 # If execution succeeds, clean up heartbeat file . If it fails do nothing (as 
the heartbeat will anyway be automatically expired later).

Note that this approach only holds the table lock temporarily, when 
checking/starting the heartbeat

 


> concurrent executions of compaction plan should not corrupt dataset
> ---
>
> Key: HUDI-7503
> URL: https://issues.apache.org/jira/browse/HUDI-7503
> Project: Apache Hudi
>  Issue Type: Improvement
>Reporter: Krishen Bhan
>Priority: Minor
>
> Currently it is not safe for 2+ writers to concurrently call 
> `org.apache.hudi.client.BaseHoodieTableServiceClient#compact` on the same 
> compaction instant. This is since one writer might execute the instant and 
> create an inflight, while the other writer sees the inflight and tries to 
> roll it back before re-attempting to execute it (since it will assume said 
> inflight was a previously failed compaction attempt).
> This logic should be updated such that only one writer will actually execute 
> the compaction plan at a time (and the others will fail/abort).
> One approach is to use a transaction (base table lock) in conjunction with 
> heartbeating, to ensure that the writer triggers a heartbeat before executing 
> compaction, and any concurrent writers will use the heartbeat to check wether 
> the compaction is currently being executed by another writer. Specifically , 
> the compact API should execute the following steps
>  # Get the instant to compact C (as usual)
>  # Start a transaction
>  # Checks if C has an active heartbeat, if so finish transaction and throw 
> exception
>  # Start a heartbeat for C (this will implicitly re-start the heartbeat if it 
> has been started before by another job)
>  # Finish transaction
>  # Run the existing compact API logic on C 

[jira] [Created] (HUDI-7503) concurrent executions of compaction plan should not corrupt dataset

2024-03-13 Thread Krishen Bhan (Jira)
Krishen Bhan created HUDI-7503:
--

 Summary: concurrent executions of compaction plan should not 
corrupt dataset
 Key: HUDI-7503
 URL: https://issues.apache.org/jira/browse/HUDI-7503
 Project: Apache Hudi
  Issue Type: Improvement
Reporter: Krishen Bhan


Currently it is not safe for 2+ writers to concurrently call 
`org.apache.hudi.client.BaseHoodieTableServiceClient#compact` on the same 
compaction instant. This is since one writer might execute the instant and 
create an inflight, while the other writer sees the inflight and tries to roll 
it back before re-attempting to execute it (since it will assume said inflight 
was a previously failed compaction attempt).

This logic should be updated such that only one writer will actually execute 
the compaction plan at a time (and the others will fail/abort).

One approach is to use a transaction (base table lock) in conjunction with 
heartbeating, to ensure that the writer triggers a heartbeat before executing 
compaction, and any concurrent writers will use the heartbeat to check wether 
the compaction is currently being executed by another writer. Specifically , 
the compact API should execute the following steps
 # Get the instant to compact C (as usual)
 # Start a transaction
 # Checks if C has an active heartbeat, if so finish transaction and throw 
exception
 # Start a heartbeat for C (this will implicitly re-start the heartbeat if it 
has been started before by another job)
 # Finish transaction
 # Run the existing compact API logic on C 
 # If execution succeeds, clean up heartbeat file . If it fails do nothing (as 
the heartbeat will anyway be automatically expired later).

Note that this approach only holds the table lock temporarily, when 
checking/starting the heartbeat

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (HUDI-7337) Implement MetricsReporter for M3

2024-01-25 Thread Krishen Bhan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Krishen Bhan updated HUDI-7337:
---
Description: Add a new implementation of 
org.apache.hudi.metrics.MetricsReporter that reports metrics to 
[M3|https://m3db.io/]. This can be achieved by using the Java library 
[https://github.com/uber-java/tally/tree/master]  (was: Add a new 
implementation of MetricsReport that reports metrics to [M3|https://m3db.io/]. 
This can be achieved by using the Java library 
https://github.com/uber-java/tally/tree/master)

> Implement MetricsReporter for M3
> 
>
> Key: HUDI-7337
> URL: https://issues.apache.org/jira/browse/HUDI-7337
> Project: Apache Hudi
>  Issue Type: Wish
>  Components: metrics
>Reporter: Krishen Bhan
>Priority: Trivial
>
> Add a new implementation of org.apache.hudi.metrics.MetricsReporter that 
> reports metrics to [M3|https://m3db.io/]. This can be achieved by using the 
> Java library [https://github.com/uber-java/tally/tree/master]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (HUDI-7337) Implement MetricsReporter for M3

2024-01-25 Thread Krishen Bhan (Jira)
Krishen Bhan created HUDI-7337:
--

 Summary: Implement MetricsReporter for M3
 Key: HUDI-7337
 URL: https://issues.apache.org/jira/browse/HUDI-7337
 Project: Apache Hudi
  Issue Type: Wish
  Components: metrics
Reporter: Krishen Bhan


Add a new implementation of MetricsReport that reports metrics to 
[M3|https://m3db.io/]. This can be achieved by using the Java library 
https://github.com/uber-java/tally/tree/master



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (HUDI-7316) AbstractHoodieLogRecordReader should accept already-constructed HoodieTableMetaClient in order to reduce occurrences of file listing calls when reloading active timeline

2024-01-19 Thread Krishen Bhan (Jira)
Krishen Bhan created HUDI-7316:
--

 Summary: AbstractHoodieLogRecordReader should accept 
already-constructed HoodieTableMetaClient in order to reduce occurrences of 
file listing calls when reloading active timeline
 Key: HUDI-7316
 URL: https://issues.apache.org/jira/browse/HUDI-7316
 Project: Apache Hudi
  Issue Type: Improvement
Reporter: Krishen Bhan


Currently some implementors of AbstractHoodieLogRecordReader create a 
HoodieTableMetaClient on construction, which implicitly reloads active 
timeline, causing a {{listStatus}} HDFS call. Since when using Spark engine 
these are created in Spark executors, a Spark user may have hundreds to 
thousands of executors that will make a {{listStatus}} call at the same time 
(during a Spark stage). To avoid these redundant calls to the HDFS NameNode (or 
any distributed filesystem service in general), users of 
AbstractHoodieLogRecordReader and implementations should pass in 
already-constructed HoodieTableMetaClient.

As long as the caller passed in a HoodieTableMetaClient with active timeline 
already loaded, and the implementation doesn't need to re-load the timeline 
(such as in order to get a more "fresh" timeline) then these calls will be 
avoided in the executor, without causing the logic to be incorrect.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (HUDI-7313) ZookeeperBasedLockProvider should store application info in lock node

2024-01-18 Thread Krishen Bhan (Jira)
Krishen Bhan created HUDI-7313:
--

 Summary: ZookeeperBasedLockProvider should store application info 
in lock node
 Key: HUDI-7313
 URL: https://issues.apache.org/jira/browse/HUDI-7313
 Project: Apache Hudi
  Issue Type: Improvement
  Components: metrics, multi-writer
Reporter: Krishen Bhan


Currently when ZookeeperBasedLockProvider acquires a lock, it does not provide 
information on the lock holder via 
[https://curator.apache.org/apidocs/org/apache/curator/framework/recipes/locks/InterProcessMutex.html#getLockNodeBytes|https://curator.apache.org/apidocs/org/apache/curator/framework/recipes/locks/InterProcessMutex.html#getLockNodeBytes--]
 which can be used to store info about the application that acquired the lock. 
Updating HUDI to implement this API would help users easily identify 
information about the application that acquired the ZooKeeper lock when they 
use ZooKeeper tooling (such as zkcli).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (HUDI-7308) LockManager::unlock should not call updateLockHeldTimerMetrics if lockDurationTimer has not been started

2024-01-17 Thread Krishen Bhan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-7308?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Krishen Bhan updated HUDI-7308:
---
Status: In Progress  (was: Open)

> LockManager::unlock should not call updateLockHeldTimerMetrics if 
> lockDurationTimer has not been started
> 
>
> Key: HUDI-7308
> URL: https://issues.apache.org/jira/browse/HUDI-7308
> Project: Apache Hudi
>  Issue Type: Bug
>  Components: metrics, multi-writer
>Affects Versions: 1.0.0-beta1
>Reporter: Krishen Bhan
>Priority: Trivial
>
> If an exception is thrown in 
> org.apache.hudi.client.transaction.lock.LockManager#lock it is possible for 
> lockDurationTimer in HoodieLockMetrics to be closed by the user before it is 
> started, which throws and bubbles up a `HoodieException("Timer was not 
> started")`  exception (rather than the actual exception that occurred when 
> trying to acquire lock). Specifically, this can happen due to following 
> scenario
>  # The BaseHoodieTableServiceClient calls 
> `org.apache.hudi.client.BaseHoodieTableServiceClient#completeClustering` , 
> which in turn calls 
> `org.apache.hudi.client.transaction.TransactionManager#beginTransaction` 
> within a `try` block
>  # During 
> `org.apache.hudi.client.transaction.TransactionManager#beginTransaction` the 
> LockManager lock API 
> `org.apache.hudi.client.transaction.lock.LockManager#lock` is called
>  # Inside ``org.apache.hudi.client.transaction.lock.LockManager#lock` , the 
> `java.util.concurrent.locks.Lock#tryLock(long, 
> java.util.concurrent.TimeUnit)` throws some exception. Because of this 
> exception, the statement
> `metrics.updateLockAcquiredMetric();` is not executed. This means that the 
> `org.apache.hudi.common.util.HoodieTimer#startTimer` method was never called 
> for the timer HoodieLockMetrics member variable 
> `org.apache.hudi.client.transaction.lock.metrics.HoodieLockMetrics#lockDurationTimer`
>  
>  # The exception in (3) bubbles up back to 
> `org.apache.hudi.client.BaseHoodieTableServiceClient#completeClustering`. 
> Since this is in a `try` block, the `catch` and `finally` blocks are 
> executed. When `finally` is executed though, 
> `org.apache.hudi.client.transaction.TransactionManager#endTransaction` is 
> called
>  # During 
> `org.apache.hudi.client.transaction.TransactionManager#endTransaction` the 
> LockManager unlock API 
> `org.apache.hudi.client.transaction.lock.LockManager#unlock` is called. 
> During the  execution of `metrics.updateLockHeldTimerMetrics();`  ,  The 
> method `org.apache.hudi.common.util.HoodieTimer#endTimer` is called for 
> `org.apache.hudi.client.transaction.lock.metrics.HoodieLockMetrics#lockDurationTimer`
>  . This throws an exception ` `HoodieException("Timer was not started")` This 
> is  because the corresponding 
> `org.apache.hudi.common.util.HoodieTimer#startTimer` method was never called
> The issue here is that the caller ('BaseHoodieTableServiceClient` in this 
> case) should have ended up re-throwing the exception thrown in (3) while 
> trying to start the transaction in 
> `org.apache.hudi.client.transaction.TransactionManager#startTransaction`. 
> Instead though, because the caller safely "cleaned up" by calling 
> `org.apache.hudi.client.transaction.TransactionManager#endTransaction` (in a 
> `finally`), the `HoodieException("Timer was not started")` exception was 
> raised instead, suppressing the exception from (3), which is the actual root 
> cause issue. Instead, the execution of 
> `org.apache.hudi.client.transaction.TransactionManager#endTransaction` should 
> have executed without throwing this additional exception, which would have 
> lead the caller to throw the exception in (3) before exiting.
> Although resolving this would not prevent the overall operation from failing, 
> it would provide better observability on the actual root cause exception (the 
> one from (3)). 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (HUDI-7308) LockManager::unlock should not call updateLockHeldTimerMetrics if lockDurationTimer has not been started

2024-01-17 Thread Krishen Bhan (Jira)
Krishen Bhan created HUDI-7308:
--

 Summary: LockManager::unlock should not call 
updateLockHeldTimerMetrics if lockDurationTimer has not been started
 Key: HUDI-7308
 URL: https://issues.apache.org/jira/browse/HUDI-7308
 Project: Apache Hudi
  Issue Type: Bug
  Components: metrics, multi-writer
Affects Versions: 1.0.0-beta1
Reporter: Krishen Bhan


If an exception is thrown in 
org.apache.hudi.client.transaction.lock.LockManager#lock it is possible for 
lockDurationTimer in HoodieLockMetrics to be closed by the user before it is 
started, which throws and bubbles up a `HoodieException("Timer was not 
started")`  exception (rather than the actual exception that occurred when 
trying to acquire lock). Specifically, this can happen due to following scenario
 # The BaseHoodieTableServiceClient calls 
`org.apache.hudi.client.BaseHoodieTableServiceClient#completeClustering` , 
which in turn calls 
`org.apache.hudi.client.transaction.TransactionManager#beginTransaction` within 
a `try` block
 # During 
`org.apache.hudi.client.transaction.TransactionManager#beginTransaction` the 
LockManager lock API `org.apache.hudi.client.transaction.lock.LockManager#lock` 
is called
 # Inside ``org.apache.hudi.client.transaction.lock.LockManager#lock` , the 
`java.util.concurrent.locks.Lock#tryLock(long, java.util.concurrent.TimeUnit)` 
throws some exception. Because of this exception, the statement
`metrics.updateLockAcquiredMetric();` is not executed. This means that the 
`org.apache.hudi.common.util.HoodieTimer#startTimer` method was never called 
for the timer HoodieLockMetrics member variable 
`org.apache.hudi.client.transaction.lock.metrics.HoodieLockMetrics#lockDurationTimer`
 
 # The exception in (3) bubbles up back to 
`org.apache.hudi.client.BaseHoodieTableServiceClient#completeClustering`. Since 
this is in a `try` block, the `catch` and `finally` blocks are executed. When 
`finally` is executed though, 
`org.apache.hudi.client.transaction.TransactionManager#endTransaction` is called
 # During 
`org.apache.hudi.client.transaction.TransactionManager#endTransaction` the 
LockManager unlock API 
`org.apache.hudi.client.transaction.lock.LockManager#unlock` is called. During 
the  execution of `metrics.updateLockHeldTimerMetrics();`  ,  The method 
`org.apache.hudi.common.util.HoodieTimer#endTimer` is called for 
`org.apache.hudi.client.transaction.lock.metrics.HoodieLockMetrics#lockDurationTimer`
 . This throws an exception ` `HoodieException("Timer was not started")` This 
is  because the corresponding 
`org.apache.hudi.common.util.HoodieTimer#startTimer` method was never called

The issue here is that the caller ('BaseHoodieTableServiceClient` in this case) 
should have ended up re-throwing the exception thrown in (3) while trying to 
start the transaction in 
`org.apache.hudi.client.transaction.TransactionManager#startTransaction`. 
Instead though, because the caller safely "cleaned up" by calling 
`org.apache.hudi.client.transaction.TransactionManager#endTransaction` (in a 
`finally`), the `HoodieException("Timer was not started")` exception was raised 
instead, suppressing the exception from (3), which is the actual root cause 
issue. Instead, the execution of 
`org.apache.hudi.client.transaction.TransactionManager#endTransaction` should 
have executed without throwing this additional exception, which would have lead 
the caller to throw the exception in (3) before exiting.

Although resolving this would not prevent the overall operation from failing, 
it would provide better observability on the actual root cause exception (the 
one from (3)). 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (HUDI-6596) Propose rollback implementation changes to guard against concurrent jobs

2023-08-17 Thread Krishen Bhan (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-6596?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751866#comment-17751866
 ] 

Krishen Bhan edited comment on HUDI-6596 at 8/17/23 7:47 PM:
-

I was going to create my PR [https://github.com/kbuci/hudi/pull/2] for this 
change on the hudi repo, but realized there was an issue since the assumptions 
made in the rollback implementation (both the existing one and my proposed 
change) where 
{{org.apache.hudi.client.BaseHoodieTableServiceClient#rollback(org.apache.hudi.table.HoodieTable,
 java.lang.String, 
org.apache.hudi.common.util.Option,
 java.lang.String)}}  is inconsistent with the changes here in 
[https://github.com/apache/hudi/pull/8849]
Specifically,  
{{org.apache.hudi.client.BaseHoodieTableServiceClient#rollback(org.apache.hudi.table.HoodieTable,
 java.lang.String, 
org.apache.hudi.common.util.Option,
 java.lang.String)}} seems to have been implemented (base on code and comments) 
under the assumption that a rollback operation will delete all instant files 
from {{commit instant to rollback}} before completing the rollback operation 
itself, which is what I had thought when I was working on my rollback fix(es). 
But it seems that after [https://github.com/apache/hudi/pull/8849] this is 
(retroactively) incorrect as now we are deleting instant files from {{commit 
instant to rollback}} after completing the rollback instant, leaving rollback 
operation as a special type of case where it is possible for rollback instant 
to be complete even if the actual rollback operation has not fully completed 
(due to failing after completing the rollback instant but before cleaning up 
instant files of `{{{}commit instant to rollback{}}} ). Although 
[https://github.com/apache/hudi/pull/8849] handles this by delegating the 
deleting of instant files from `{{{}commit instant to rollback{}}} to some 
clean rollbackFailedWrites operation,  I think the intention/invariants/rules 
of how rollback operates is a bit ambiguous to me and something that should be 
reconciled. To further add complexity, it seems that based on 
[https://github.com/kbuci/hudi/blob/35be9bbbc7ef7ae6ad0a4955da78da4c0463074f/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java#L630]
 it is also currently legal to remove a request rollback plan, in other words 
"rolling back" a pending rollback plan.

Also after taking another look at the reason for 
[https://github.com/apache/hudi/pull/8849] I think the fix there can be 
reverted and handled alternatively, since it seems to me that fixing/finding  
bugs with getPendingRollbackInfo and preventing concurrent rollback 
scheduling/execution might prevent the underlying issue/reason for PR in the 
first place


was (Author: JIRAUSER301521):
I was going to create my PR [https://github.com/kbuci/hudi/pull/2] for this 
change on the hudi repo, but realized there was an issue since the assumptions 
made in the rollback implementation (both the existing one and my proposed 
change) where 
{{org.apache.hudi.client.BaseHoodieTableServiceClient#rollback(org.apache.hudi.table.HoodieTable,
 java.lang.String, 
org.apache.hudi.common.util.Option,
 java.lang.String)}}  is inconsistent with the changes here in 
[https://github.com/apache/hudi/pull/8849]
Specifically,  
{{org.apache.hudi.client.BaseHoodieTableServiceClient#rollback(org.apache.hudi.table.HoodieTable,
 java.lang.String, 
org.apache.hudi.common.util.Option,
 java.lang.String)}} seems to have been implemented (base on code and comments) 
under the assumption that a rollback operation will delete all instant files 
from {{commit instant to rollback}} before completing the rollback operation 
itself, which is what I had thought when I was working on my rollback fix(es). 
But it seems that after [https://github.com/apache/hudi/pull/8849] this is 
(retroactively) incorrect as now we are deleting instant files from {{commit 
instant to rollback}} after completing the rollback instant, leaving rollback 
operation as a special type of case where it is possible for rollback instant 
to be complete even if the actual rollback operation has not fully completed 
(due to failing after completing the rollback instant but before cleaning up 
instant files of `{{{}commit instant to rollback{}}} ). Although 
[https://github.com/apache/hudi/pull/8849] handles this by delegating the 
deleting of instant files from `{{{}commit instant to rollback{}}} to some 
clean rollbackFailedWrites operation,  I think the intention/invariants/rules 
of how rollback operates is a bit ambiguous to me and something that should be 
reconciled.

Also after taking another look at the reason for 
[https://github.com/apache/hudi/pull/8849] I think the fix there can be 
reverted and handled alternatively, since it seems to me that fixing/finding  
bugs with getPendingRollbackInfo and preventing concurrent 

[jira] [Commented] (HUDI-6596) Propose rollback implementation changes to guard against concurrent jobs

2023-08-17 Thread Krishen Bhan (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-6596?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17755672#comment-17755672
 ] 

Krishen Bhan commented on HUDI-6596:


{quote}  I think we should use a different name to skipLocking, if acquiring 
the lock is skipped because we already acquired the lock, then we should use a 
different variable something like isLockAcquired or something. 
{quote}
I think that was the convention I noticed, but sure I can address that once I 
post the PR for review, thanks!
{quote}
Without complicating the rollback logic, let us see all the cases where we use 
rollback.
1. Rollback failed writes: Lock has to be acquired until scheduling the 
rollback plans for pending instantsToRollback and for execution it need not 
acquire a lock.
2. Rollback a specific instant: Only schedule step needs to be under a lock.
3. Restore operation: Entire operation needs to be under a lock. 
 
For rollbackFailedWrites method, break it down to two Stages
 
Stage 1: Scheduling stage
 
Step 1: Acquire lock and reload active timeline
Step 2: getInstantsToRollback
Step 3: removeInflightFilesAlreadyRolledBack
Step 4: getPendingRollbackInfos
Step 5: Use existing plan or schedule rollback
Step 6: Release lock
 
Stage 2: Execution stage
 
Step 7: Check if heartbeat exist for pending rollback plan. If yes abort else 
start an heartbeat and proceed further for executing it.
{quote} 
For now in this ticket the intention is to just focus on (2) `Rollback a 
specific instant:` . Depending on how this implementation goes, I think we 
could follow your approach for (1) `rollbackFailedWrites` when I create a 
ticket to address that. Sorry, I should rename the name of this JIRA ticket to 
clarify that.

{quote}
Rollback operation are not that common. We only do rollback if something fails. 
So, it is not like .clean or .commit operations. So, we should be ok in seeing 
some noise.
{quote}
The issue is that although the chance of an individual job transiently failing 
on a upsert is low, as we add more concurrent writers to our pool of upsert 
jobs on a dataset, the chance that at least one upsert job will fail increases. 
In addition there is the case of underlying infrastructure (like Spark/YARN) 
service degradations (that we've seen internally in our organization) it's 
possible that all writers might fail during an upsert/rollback in the same 
window of time. This means that we should try to gracefully/resiliently account 
for a chance that there is a concurrent rollback going on during a job's upsert 
operation, or even a concurrent rollback that itself has failed. Although 
locking the table during a rollback is out of the question, we can still go 
with an approach like I suggested in 
https://issues.apache.org/jira/browse/HUDI-6596?focusedCommentId=17751201=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17751201
 , to greatly reduce the chance that some sporadic rollback/failures will cause 
all concurrent upsert jobs to fail.



>  Propose rollback implementation changes to guard against concurrent jobs
> -
>
> Key: HUDI-6596
> URL: https://issues.apache.org/jira/browse/HUDI-6596
> Project: Apache Hudi
>  Issue Type: Wish
>Reporter: Krishen Bhan
>Priority: Trivial
>
> h1. Issue
> The existing rollback API in 0.14 
> [https://github.com/apache/hudi/blob/master/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java#L877]
>  executes a rollback plan, either taking in an existing rollback plan 
> provided by the caller for a previous rollback or attempt, or scheduling a 
> new rollback instant if none is provided. Currently it is not safe for two 
> concurrent jobs to call this API (when skipLocking=False and the callers 
> aren't already holding a lock), as this can lead to an issue where multiple 
> rollback requested plans are created or two jobs are executing the same 
> rollback instant at the same time.
> h1. Proposed change
> One way to resolve this issue is to refactor this rollback function such that 
> if skipLocking=false, the following steps are followed
>  # Acquire the table lock
>  # Reload the active timeline
>  # Look at the active timeline to see if there is a inflight rollback instant 
> from a previous rollback attempt, if it exists then assign this is as the 
> rollback plan to execute. Also, check if a pending rollback plan was passed 
> in by caller. Then it executes the following steps depending on whether the 
> caller passed a pending rollback instant plan.
>  ##  [a] If a pending inflight rollback plan was passed in by caller, then 
> check that there is a previous attempted rollback instant on timeline (and 
> that the instant times match) and continue to use this rollback plan. If that 
> isn't the 

[jira] [Comment Edited] (HUDI-6596) Propose rollback implementation changes to guard against concurrent jobs

2023-08-08 Thread Krishen Bhan (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-6596?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751866#comment-17751866
 ] 

Krishen Bhan edited comment on HUDI-6596 at 8/8/23 11:48 PM:
-

I was going to create my PR [https://github.com/kbuci/hudi/pull/2] for this 
change on the hudi repo, but realized there was an issue since the assumptions 
made in the rollback implementation (both the existing one and my proposed 
change) where 
{{org.apache.hudi.client.BaseHoodieTableServiceClient#rollback(org.apache.hudi.table.HoodieTable,
 java.lang.String, 
org.apache.hudi.common.util.Option,
 java.lang.String)}}  is inconsistent with the changes here in 
[https://github.com/apache/hudi/pull/8849]
Specifically,  
{{org.apache.hudi.client.BaseHoodieTableServiceClient#rollback(org.apache.hudi.table.HoodieTable,
 java.lang.String, 
org.apache.hudi.common.util.Option,
 java.lang.String)}} seems to have been implemented (base on code and comments) 
under the assumption that a rollback operation will delete all instant files 
from {{commit instant to rollback}} before completing the rollback operation 
itself, which is what I had thought when I was working on my rollback fix(es). 
But it seems that after [https://github.com/apache/hudi/pull/8849] this is 
(retroactively) incorrect as now we are deleting instant files from {{commit 
instant to rollback}} after completing the rollback instant, leaving rollback 
operation as a special type of case where it is possible for rollback instant 
to be complete even if the actual rollback operation has not fully completed 
(due to failing after completing the rollback instant but before cleaning up 
instant files of `{{{}commit instant to rollback{}}} ). Although 
[https://github.com/apache/hudi/pull/8849] handles this by delegating the 
deleting of instant files from `{{{}commit instant to rollback{}}} to some 
clean rollbackFailedWrites operation,  I think the intention/invariants/rules 
of how rollback operates is a bit ambiguous to me and something that should be 
reconciled.

Also after taking another look at the reason for 
[https://github.com/apache/hudi/pull/8849] I think the fix there can be 
reverted and handled alternatively, since it seems to me that fixing/finding  
bugs with getPendingRollbackInfo and preventing concurrent rollback 
scheduling/execution might prevent the underlying issue/reason for PR in the 
first place


was (Author: JIRAUSER301521):
I was going to create my PR [https://github.com/kbuci/hudi/pull/2] for this 
change on the hudi repo, but realized there was an issue since the assumptions 
made in the rollback implementation (both the existing one and my proposed 
change) where 
{{org.apache.hudi.client.BaseHoodieTableServiceClient#rollback(org.apache.hudi.table.HoodieTable,
 java.lang.String, 
org.apache.hudi.common.util.Option,
 java.lang.String)}}  is inconsistent with the changes here in 
[https://github.com/apache/hudi/pull/8849]
Specifically,  
{{org.apache.hudi.client.BaseHoodieTableServiceClient#rollback(org.apache.hudi.table.HoodieTable,
 java.lang.String, 
org.apache.hudi.common.util.Option,
 java.lang.String)}} seems to have been implemented (base on code and comments) 
under the assumption that a rollback operation will delete all instant files 
from {{commit instant to rollback}} before completing the rollback operation 
itself, which is what I had thought when I was working on my rollback fix(es). 
But it seems that after [https://github.com/apache/hudi/pull/8849] this is 
(retroactively) incorrect as now we are deleting instant files from {{commit 
instant to rollback}} after completing the rollback instant, leaving rollback 
operation as a special type of case where it is possible for rollback instant 
to be complete even if the actual rollback operation has not fully completed 
(due to failing after completing the rollback instant but before cleaning up 
instant files of `{{{}commit instant to rollback{}}} ). Although 
[https://github.com/apache/hudi/pull/8849] handles this by delegating the 
deleting of instant files from `{{{}commit instant to rollback{}}} to some 
clean rollbackFailedWrites operation,  I think the intention/invariants/rules 
of how rollback operates is a bit ambiguous to me and something that should be 
reconciled.

>  Propose rollback implementation changes to guard against concurrent jobs
> -
>
> Key: HUDI-6596
> URL: https://issues.apache.org/jira/browse/HUDI-6596
> Project: Apache Hudi
>  Issue Type: Wish
>Reporter: Krishen Bhan
>Priority: Trivial
>
> h1. Issue
> The existing rollback API in 0.14 
> [https://github.com/apache/hudi/blob/master/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java#L877]
>  executes a rollback 

[jira] [Commented] (HUDI-6596) Propose rollback implementation changes to guard against concurrent jobs

2023-08-07 Thread Krishen Bhan (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-6596?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751866#comment-17751866
 ] 

Krishen Bhan commented on HUDI-6596:


I was going to create my PR [https://github.com/kbuci/hudi/pull/2] for this 
change on the hudi repo, but realized there was an issue since the assumptions 
made in the rollback implementation (both the existing one and my proposed 
change) where 
{{org.apache.hudi.client.BaseHoodieTableServiceClient#rollback(org.apache.hudi.table.HoodieTable,
 java.lang.String, 
org.apache.hudi.common.util.Option,
 java.lang.String)}}  is inconsistent with the changes here in 
[https://github.com/apache/hudi/pull/8849]
Specifically,  
{{org.apache.hudi.client.BaseHoodieTableServiceClient#rollback(org.apache.hudi.table.HoodieTable,
 java.lang.String, 
org.apache.hudi.common.util.Option,
 java.lang.String)}} seems to have been implemented (base on code and comments) 
under the assumption that a rollback operation will delete all instant files 
from {{commit instant to rollback}} before completing the rollback operation 
itself, which is what I had thought when I was working on my rollback fix(es). 
But it seems that after [https://github.com/apache/hudi/pull/8849] this is 
(retroactively) incorrect as now we are deleting instant files from {{commit 
instant to rollback}} after completing the rollback instant, leaving rollback 
operation as a special type of case where it is possible for rollback instant 
to be complete even if the actual rollback operation has not fully completed 
(due to failing after completing the rollback instant but before cleaning up 
instant files of `{{{}commit instant to rollback{}}} ). Although 
[https://github.com/apache/hudi/pull/8849] handles this by delegating the 
deleting of instant files from `{{{}commit instant to rollback{}}} to some 
clean rollbackFailedWrites operation,  I think the intention/invariants/rules 
of how rollback operates is a bit ambiguous to me and something that should be 
reconciled.

>  Propose rollback implementation changes to guard against concurrent jobs
> -
>
> Key: HUDI-6596
> URL: https://issues.apache.org/jira/browse/HUDI-6596
> Project: Apache Hudi
>  Issue Type: Wish
>Reporter: Krishen Bhan
>Priority: Trivial
>
> h1. Issue
> The existing rollback API in 0.14 
> [https://github.com/apache/hudi/blob/master/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java#L877]
>  executes a rollback plan, either taking in an existing rollback plan 
> provided by the caller for a previous rollback or attempt, or scheduling a 
> new rollback instant if none is provided. Currently it is not safe for two 
> concurrent jobs to call this API (when skipLocking=False and the callers 
> aren't already holding a lock), as this can lead to an issue where multiple 
> rollback requested plans are created or two jobs are executing the same 
> rollback instant at the same time.
> h1. Proposed change
> One way to resolve this issue is to refactor this rollback function such that 
> if skipLocking=false, the following steps are followed
>  # Acquire the table lock
>  # Reload the active timeline
>  # Look at the active timeline to see if there is a inflight rollback instant 
> from a previous rollback attempt, if it exists then assign this is as the 
> rollback plan to execute. Also, check if a pending rollback plan was passed 
> in by caller. Then it executes the following steps depending on whether the 
> caller passed a pending rollback instant plan.
>  ##  [a] If a pending inflight rollback plan was passed in by caller, then 
> check that there is a previous attempted rollback instant on timeline (and 
> that the instant times match) and continue to use this rollback plan. If that 
> isn't the case, then raise a rollback exception since this means another job 
> has concurrently already executed this plan. Note that in a valid HUDI 
> dataset there can be at most one rollback instant for a corresponding commit 
> instant, which is why if we no longer see a pending rollback in timeline in 
> this phase we can safely assume that it had already been executed to 
> completion.
>  ##  [b] If no pending inflight rollback plan was passed in by caller and no 
> pending rollback instant was found in timeline earlier, then schedule a new 
> rollback plan
>  # Now that a rollback plan and requested rollback instant time has been 
> assigned, check for an active heartbeat for the rollback instant time. If 
> there is one, then abort the rollback as that means there is a concurrent job 
> executing that rollback. If not, then start a heartbeat for that rollback 
> instant time.
>  # Release the table lock
>  # Execute the rollback plan and complete the rollback instant. 

[jira] [Commented] (HUDI-6596) Propose rollback implementation changes to guard against concurrent jobs

2023-08-04 Thread Krishen Bhan (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-6596?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751201#comment-17751201
 ] 

Krishen Bhan commented on HUDI-6596:


Sorry I realized that I might now have fully addressed your concern about 
idempotency/concurrent retries. Currently with my proposal if concurrent jobs 
try to execute a rollback instant around the same time, even though only one 
job will execute the rollback at a time, the other jobs will all instantly 
fail, which may not always be desirable (as that exception might propagate 
upwards and cause the entire job to fail, creating more noise). In order to 
minimize this, I think I am going to revise my approach to 
- Add a new custom write config value "time to wait for rollback"
- remove step 4 
- Add the following step in between 5 and 6:
{code:java}
 Acquire the table lock, check if the rollback heartbeat has expired, and if so 
start the rollback heartbeat again and release lock. Let's call this "claim the 
heartbeat". If the rollback heartbeat is still active, then 
- If "time to wait for rollback" isn't enabled, the raise an exception and 
release lock
- If "time to wait for rollback" is enabled, then release the table lock and 
keep on trying (every x seconds) to "claim the heartbeat". If we are not able 
to "claim the heartbeat" within "time to wait for rollback" seconds then we 
raise an exception, like the previous case. Otherwise, if we successfully 
"claim the heartbeat", then this means the recent rollback attempt either 
finished or failed. In order to determine which, we reload the active timeline 
again. If the pending rollback instant is still in timeline, we execute it, 
otherwise we safely return False. Note that although we are repeatedly holding 
and releasing the table lock every time we try to "claim the heartbeat", we are 
only making 1-2 DFS calsl (to check the heartbeat file and create it if 
possible) while the lock is held.{code}
Although this does unfortunately add some code complexity, I plan to add this 
to my organization's internal HUDI fork since I think it will be helpful for us 
to reduce frequency of failures, and I figured I should share this here (and on 
the PR I will create) in case this might be useful for other Apache HUDI users. 

 

 

 

 

 


 

>  Propose rollback implementation changes to guard against concurrent jobs
> -
>
> Key: HUDI-6596
> URL: https://issues.apache.org/jira/browse/HUDI-6596
> Project: Apache Hudi
>  Issue Type: Wish
>Reporter: Krishen Bhan
>Priority: Trivial
>
> h1. Issue
> The existing rollback API in 0.14 
> [https://github.com/apache/hudi/blob/master/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java#L877]
>  executes a rollback plan, either taking in an existing rollback plan 
> provided by the caller for a previous rollback or attempt, or scheduling a 
> new rollback instant if none is provided. Currently it is not safe for two 
> concurrent jobs to call this API (when skipLocking=False and the callers 
> aren't already holding a lock), as this can lead to an issue where multiple 
> rollback requested plans are created or two jobs are executing the same 
> rollback instant at the same time.
> h1. Proposed change
> One way to resolve this issue is to refactor this rollback function such that 
> if skipLocking=false, the following steps are followed
>  # Acquire the table lock
>  # Reload the active timeline
>  # Look at the active timeline to see if there is a inflight rollback instant 
> from a previous rollback attempt, if it exists then assign this is as the 
> rollback plan to execute. Also, check if a pending rollback plan was passed 
> in by caller. Then it executes the following steps depending on whether the 
> caller passed a pending rollback instant plan.
>  ##  [a] If a pending inflight rollback plan was passed in by caller, then 
> check that there is a previous attempted rollback instant on timeline (and 
> that the instant times match) and continue to use this rollback plan. If that 
> isn't the case, then raise a rollback exception since this means another job 
> has concurrently already executed this plan. Note that in a valid HUDI 
> dataset there can be at most one rollback instant for a corresponding commit 
> instant, which is why if we no longer see a pending rollback in timeline in 
> this phase we can safely assume that it had already been executed to 
> completion.
>  ##  [b] If no pending inflight rollback plan was passed in by caller and no 
> pending rollback instant was found in timeline earlier, then schedule a new 
> rollback plan
>  # Now that a rollback plan and requested rollback instant time has been 
> assigned, check for an active heartbeat for the rollback instant time. 

[jira] [Comment Edited] (HUDI-6596) Propose rollback implementation changes to guard against concurrent jobs

2023-08-03 Thread Krishen Bhan (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-6596?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750879#comment-17750879
 ] 

Krishen Bhan edited comment on HUDI-6596 at 8/3/23 8:15 PM:


Thanks for the reply!
> The table lock could become a bottleneck, potentially leading to performance 
> issues as other operations might be blocked too. It might be useful to 
> consider how frequently you expect concurrent rollbacks to occur and whether 
> this might create a performance problem.

Based on our use case at least, I would expect attempted concurrent rollbacks 
to be uncommon / an edge case for now, and more rare as my organization 
internally makes more fixes/changes to our orchestration process. 
But you bring up a good concern that because this locks around the 
scheduledRollback call , it will block other hudi writers  from progressing 
while it generates the rollback plan, which might be relatively time-consuming 
(since in spark HUDI 0.10 can lead to launching a new spark stage) at least 
compared to other places where HUDI holds the table lock. If this becomes a 
concern for other HUDI users (and we want HUDI OCC locking to avoid holding 
table lock while reading/creating an arbitrary/unbounded number of 
instants/data files in parallel), one solution I can think of would be to 
refactor HoodieTable scheduleRollback call such that we can first create the 
rollback plan before acquiring the table lock (in my proposed approach), and 
then later if we actually call HoodieTable scheduleRollback we just pass in 
this existing rollback plan. This way the actual "work" needed to generate 
rollback plan is done beforehand before locking, and we only check and update 
instant file(s) while under lock (and since in my proposed approach we anyway 
only schedule rollback if none has ever been scheduled before, I think that we 
shouldn't have to worry about the rollback plan we created becoming 
invalid/stale). Though I'm not sure how feasible this is since it since it may 
affect public APIs.

> Can we ensure rollbacks are idempotent in case of repeated failures or 
> retries?

Yes making sure rollbacks are idempotent ( in the sense that a pending rollback 
can keep on being retried until success) is a must. Both the current/original 
implementation and the proposed approach should address this. Both 
approaches/implementations handle the case where rollback is pending but the 
instant to rollback is gone (where we need to re-materialize the instant info 
and tell the rollback execution to not actually delete instants). Also, in the 
proposed approach here in step 3 we are directly using the pending rollback 
instant that we observe in the timeline, if one exists. Unfortunately the logic 
and my phrasing for step 3 is a bit awkward, since because the caller can pass 
in a 
pendingRollbackInfo
that it expects to be executed, I decided that we couldn't just ignore it, but 
rather we needed to make validate that this pendingRollbackInfo is the same as 
the pending rollback instant we just saw in the timeline, and abort the 
rollback if not. 

> Worth considering edge cases where heartbeats could become stale or be missed 
> (e.g., if a job crashes without properly closing its heartbeat). Handling 
> these scenarios gracefully will help ensure that rollbacks can still proceed 
> when needed. 

Yeah, if a failed rollback job doesn't clean up the heartbeat after failure, 
any rollback attempt right after (within the interval of  `[heartbeat timeout * 
(allowed heartbeat misses + 1)]` I think) will fail. The alternative (that I 
can think of) would be to simply allow for the chance of 2+ rollback jobs to 
work on the same rollback instant. The issue though is that even if the 
HoodieTable executeRollback implementation prevents the dataset from being 
corrupted and will just produce a retry-able failure, I thought that it would 
be noisy/tricky for a user to understand/debug. So I decided to add Step 4, 
since from my perspective/understanding it was a tradeoff between "always 
failing with an easy to understand retry-able exception" versus "rarely failing 
with a hard to understand retry-able exception". 


was (Author: JIRAUSER301521):
Thanks for the reply!
> The table lock could become a bottleneck, potentially leading to performance 
> issues as other operations might be blocked too. It might be useful to 
> consider how frequently you expect concurrent rollbacks to occur and whether 
> this might create a performance problem.

Based on our use case at least, I would expect attempted concurrent rollbacks 
to be uncommon / an edge case for now, and more rare as my organization 
internally makes more fixes/changes to our orchestration process. 
But you bring up a good concern that because this locks around the 
scheduledRollback call , it will block other hudi writers  from progressing 
while it generates the rollback plan, 

[jira] [Commented] (HUDI-6596) Propose rollback implementation changes to guard against concurrent jobs

2023-08-03 Thread Krishen Bhan (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-6596?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750879#comment-17750879
 ] 

Krishen Bhan commented on HUDI-6596:


Thanks for the reply!
> The table lock could become a bottleneck, potentially leading to performance 
> issues as other operations might be blocked too. It might be useful to 
> consider how frequently you expect concurrent rollbacks to occur and whether 
> this might create a performance problem.

Based on our use case at least, I would expect attempted concurrent rollbacks 
to be uncommon / an edge case for now, and more rare as my organization 
internally makes more fixes/changes to our orchestration process. 
But you bring up a good concern that because this locks around the 
scheduledRollback call , it will block other hudi writers  from progressing 
while it generates the rollback plan, which might be relatively time-consuming 
(since in spark HUDI 0.10 can lead to launching a new spark stage) at least 
compared to other places where HUDI holds the table lock. If this becomes a 
concern for other HUDI users (and we want HUDI OCC locking to avoid holding 
table lock while reading/creating an arbitrary/unbounded number of 
instants/data files in parallel), one solution I can think of would be to 
refactor HoodieTable scheduleRollback call such that we can first create the 
rollback plan before acquiring the table lock (in my proposed approach), and 
then later if we actually call HoodieTable scheduleRollback we just pass in 
this existing rollback plan. This way the actual "work" needed to generate 
rollback plan is done beforehand before locking, and we only check and update 
instant file(s) while under lock (and since in my proposed approach we anyway 
only schedule rollback if none has ever been scheduled before, I think that we 
shouldn't have to worry about the rollback plan we created becoming 
invalid/stale). Though I'm not sure how feasible this is since it since it may 
affect public APIs.

> Can we ensure rollbacks are idempotent in case of repeated failures or 
> retries?

Yes making sure rollbacks are idempotent ( in the sense that a pending rollback 
can keep on being retried until success) is a must. Both the current/original 
implementation and the proposed approach should address this. Both 
approaches/implementations handle the case where rollback is pending but the 
instant to rollback is gone (where we need to re-materialize the instant info 
and tell the rollback execution to not actually delete instants). Also, in the 
proposed approach here in step 3 we are directly using the pending rollback 
instant that we observe in the timeline, if one exists. Unfortunately the logic 
and my phrasing for step 3 is a bit awkward, since because the caller can pass 
in a 
pendingRollbackInfo
that it expects to be executed, I decided that we couldn't just ignore it, but 
rather we needed to make validate that this pendingRollbackInfo is the same as 
the pending rollback instant we just saw in the timeline, and abort the 
rollback if not. 

> Worth considering edge cases where heartbeats could become stale or be missed 
> (e.g., if a job crashes without properly closing its heartbeat). Handling 
> these scenarios gracefully will help ensure that rollbacks can still proceed 
> when needed. 

Yeah, if a failed rollback job doesn't clean up the heartbeat after failure, 
any rollback attempt right after (within the interval of  `[heartbeat timeout * 
(allowed heartbeat misses + 1)]` I think) will fail. The alternative (that I 
can think of) would be to simply allow for the chance of 2+ rollback jobs to 
work on the same rollback instant. The issue though is that even if the 
HoodieTable executeRollback implementation prevents the dataset from being 
corrupted and will just produce a retry-able failure, I thought that it would 
be noisy/tricky for a user to understand/debug. So I decided to add Step 4, 
since from my perspective/understanding it was a tradeoff between "always 
failing with an easy to understand retry-able exception" versus "rarely failing 
with a hard to understand retry-able exception". 
Though I think the chance of "stale heartbeat" can be lowered by updating the 
commit code path to clean the heartbeat if an exception is raised (this isn't 
done in 0.10 I think, but not sure if its still like this in 0.14). In fact my 
organization internally does this with our internal forked/modified version of 
clustering replacecommit (we have modified replacecommit to not have an 
immutable plan, similar to commit). Though of course this isn't a guarantee see 
a HUDI spark job writer might fail with a lower level runtime error.

>  Propose rollback implementation changes to guard against concurrent jobs
> -
>
> Key: HUDI-6596
> URL: 

[jira] [Updated] (HUDI-6596) Propose rollback implementation changes to guard against concurrent jobs

2023-08-02 Thread Krishen Bhan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-6596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Krishen Bhan updated HUDI-6596:
---
Description: 
h1. Issue

The existing rollback API in 0.14 
[https://github.com/apache/hudi/blob/master/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java#L877]
 executes a rollback plan, either taking in an existing rollback plan provided 
by the caller for a previous rollback or attempt, or scheduling a new rollback 
instant if none is provided. Currently it is not safe for two concurrent jobs 
to call this API (when skipLocking=False and the callers aren't already holding 
a lock), as this can lead to an issue where multiple rollback requested plans 
are created or two jobs are executing the same rollback instant at the same 
time.
h1. Proposed change

One way to resolve this issue is to refactor this rollback function such that 
if skipLocking=false, the following steps are followed
 # Acquire the table lock
 # Reload the active timeline
 # Look at the active timeline to see if there is a inflight rollback instant 
from a previous rollback attempt, if it exists then assign this is as the 
rollback plan to execute. Also, check if a pending rollback plan was passed in 
by caller. Then it executes the following steps depending on whether the caller 
passed a pending rollback instant plan.
 ##  [a] If a pending inflight rollback plan was passed in by caller, then 
check that there is a previous attempted rollback instant on timeline (and that 
the instant times match) and continue to use this rollback plan. If that isn't 
the case, then raise a rollback exception since this means another job has 
concurrently already executed this plan. Note that in a valid HUDI dataset 
there can be at most one rollback instant for a corresponding commit instant, 
which is why if we no longer see a pending rollback in timeline in this phase 
we can safely assume that it had already been executed to completion.
 ##  [b] If no pending inflight rollback plan was passed in by caller and no 
pending rollback instant was found in timeline earlier, then schedule a new 
rollback plan
 # Now that a rollback plan and requested rollback instant time has been 
assigned, check for an active heartbeat for the rollback instant time. If there 
is one, then abort the rollback as that means there is a concurrent job 
executing that rollback. If not, then start a heartbeat for that rollback 
instant time.
 # Release the table lock
 # Execute the rollback plan and complete the rollback instant. Regardless of 
whether this succeeds or fails with an exception, close the heartbeat. This 
increases the chance that the next job that tries to call this rollback API 
will follow through with the rollback and not abort due to an active previous 
heartbeat

 
 * These steps will only be enforced for  skipLocking=false, since if  
skipLocking=true then that means the caller may already be explicitly holding a 
table lock. In this case, acquiring the lock again in step (1) will fail.
 * Acquiring a lock and reloading timeline for (1-3) will guard against data 
race conditions where another job calls this rollback API at same time and 
schedules its own rollback plan and instant. This is since if no rollback has 
been attempted before for this instant, then before step (1), there is a window 
of time where another concurrent rollback job could have scheduled a rollback 
plan, failed execution, and cleaned up heartbeat, all while the current 
rollback job is running. As a result, even if the current job was passed in an 
empty pending rollback plan, it still needs to check the active timeline to 
ensure that no new rollback pending instant has been created. 
 * Using a heartbeat will signal to other callers in other jobs that there is 
another job already executing this rollback. Checking for expired heartbeat and 
(re)-starting the heartbeat has to be done under a lock, so that multiple jobs 
don't each start it at the same time and assume that they are the only ones 
that are heartbeating. 
 * The table lock is no longer needed after (5), since it can now be safely 
assumed that no other job (calling this rollback API) will execute this 
rollback instant. 

One example implementation to achieve this:

 
{code:java}
@Deprecated
public boolean rollback(final String commitInstantTime, 
Option pendingRollbackInfo, boolean skipLocking,
Option rollbackInstantTimeOpt) throws HoodieRollbackException {
  final Timer.Context timerContext = this.metrics.getRollbackCtx();
  final Option commitInstantOpt;
  final HoodieTable table;
  try {
table = createTable(config, hadoopConf);
  } catch (Exception e) {
throw new HoodieRollbackException("Failed to initalize table for rollback " 
+ config.getBasePath() + " commits " + commitInstantTime, e);
  }
  final String rollbackInstantTime;
  final boolean 

[jira] [Updated] (HUDI-6596) Propose rollback implementation changes to guard against concurrent jobs

2023-08-02 Thread Krishen Bhan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-6596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Krishen Bhan updated HUDI-6596:
---
Description: 
h1. Issue

The existing rollback API in 0.14 
[https://github.com/apache/hudi/blob/master/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java#L877]
 executes a rollback plan, either taking in an existing rollback plan provided 
by the caller for a previous rollback or attempt, or scheduling a new rollback 
instant if none is provided. Currently it is not safe for two concurrent jobs 
to call this API (when skipLocking=False and the callers aren't already holding 
a lock), as this can lead to an issue where multiple rollback requested plans 
are created or two jobs are executing the same rollback instant at the same 
time.
h1. Proposed change

One way to resolve this issue is to refactor this rollback function such that 
if skipLocking=false, the following steps are followed
 # Acquire the table lock
 # Reload the active timeline
 # Look at the active timeline to see if there is a inflight rollback instant 
from a previous rollback attempt, if it exists then assign this is as the 
rollback plan to execute (at first). Also, check if a pending rollback plan was 
passed in by caller. Then it executes the following steps depending on whether 
the caller passed a pending rollback instant plan.
 ##  [a] If a pending inflight rollback plan was passed in by caller, then 
check that there is a previous attempted rollback instant on timeline (and that 
the instant times match) and continue to use this rollback plan. If that isn't 
the case, then raise a rollback exception since this means another job has 
concurrently already executed this plan. Note that in a valid HUDI dataset 
there can be at most one rollback instant for a corresponding commit instant, 
which is why if we no longer see a pending rollback in timeline in this phase 
we can safely assume that it had already been executed to completion.
 ##  [b] If no pending inflight rollback plan was passed in by caller and no 
pending rollback instant was found in timeline earlier, then schedule a new 
rollback plan
 # Now that a rollback plan and requested rollback instant time has been 
assigned, check for an active heartbeat for the rollback instant time. If there 
is one, then abort the rollback as that means there is a concurrent job 
executing that rollback. If not, then start a heartbeat for that rollback 
instant time.
 # Release the table lock
 # Execute the rollback plan and complete the rollback instant. Regardless of 
whether this succeeds or fails with an exception, close the heartbeat. This 
increases the chance that the next job that tries to call this rollback API 
will follow through with the rollback and not abort due to an active previous 
heartbeat

 
 * These steps will only be enforced for  skipLocking=false, since if  
skipLocking=true then that means the caller may already be explicitly holding a 
table lock. In this case, acquiring the lock again in step (1) will fail.
 * Acquiring a lock and reloading timeline for (1-3) will guard against data 
race conditions where another job calls this rollback API at same time and 
schedules its own rollback plan and instant. This is since if no rollback has 
been attempted before for this instant, then before step (1), there is a window 
of time where another concurrent rollback job could have scheduled a rollback 
plan, failed execution, and cleaned up heartbeat, all while the current 
rollback job is running. As a result, even if the current job was passed in an 
empty pending rollback plan, it still needs to check the active timeline to 
ensure that no new rollback pending instant has been created. 
 * Using a heartbeat will signal to other callers in other jobs that there is 
another job already executing this rollback. Checking for expired heartbeat and 
(re)-starting the heartbeat has to be done under a lock, so that multiple jobs 
don't each start it at the same time and assume that they are the only ones 
that are heartbeating. 
 * The table lock is no longer needed after (5), since it can now be safely 
assumed that no other job (calling this rollback API) will execute this 
rollback instant. 

One example implementation to achieve this:

 
{code:java}
@Deprecated
public boolean rollback(final String commitInstantTime, 
Option pendingRollbackInfo, boolean skipLocking,
Option rollbackInstantTimeOpt) throws HoodieRollbackException {
  final Timer.Context timerContext = this.metrics.getRollbackCtx();
  final Option commitInstantOpt;
  final HoodieTable table;
  try {
table = createTable(config, hadoopConf);
  } catch (Exception e) {
throw new HoodieRollbackException("Failed to initalize table for rollback " 
+ config.getBasePath() + " commits " + commitInstantTime, e);
  }
  final String rollbackInstantTime;
  final boolean 

[jira] [Updated] (HUDI-6596) Propose rollback implementation changes to guard against concurrent jobs

2023-08-02 Thread Krishen Bhan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-6596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Krishen Bhan updated HUDI-6596:
---
Description: 
h1. Issue

The existing rollback API in 0.14 
[https://github.com/apache/hudi/blob/master/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java#L877]
 executes a rollback plan, either taking in an existing rollback plan provided 
by the caller for a previous rollback or attempt, or scheduling a new rollback 
instant if none is provided. Currently it is not safe for two concurrent jobs 
to call this API (when skipLocking=False and the callers aren't already holding 
a lock), as this can lead to an issue where multiple rollback requested plans 
are created or two jobs are executing the same rollback instant at the same 
time.
h1. Proposed change

One way to resolve this issue is to refactor this rollback function such that 
if skipLocking=false, the following steps are followed
 # Acquire the table lock
 # Reload the active timeline
 # Look at the active timeline to see if there is a inflight rollback instant 
from a previous rollback attempt, if it exists then assign this is as the 
rollback plan to execute. Also, check if a pending rollback plan was passed in 
by caller. Then it executes the following steps depending on whether the caller 
passed a pending rollback instant plan.
 ##  [a] If a pending inflight rollback plan was passed in by caller, then 
check that there is a previous attempted rollback instant on timeline (and that 
the instant times match) and continue to use this rollback plan. If that isn't 
the case, then raise a rollback exception since this means another job has 
concurrently already executed this plan. Note that in a valid HUDI dataset 
there can be at most one rollback instant for a corresponding commit instant, 
which is why if we no longer see a pending rollback in timeline in this phase 
we can safely assume that it had already been executed to completion.
 ##  [b] If no pending inflight rollback plan was passed in by caller and no 
pending rollback instant was found in timeline earlier, then schedule a new 
rollback plan
 # Now that a rollback plan and requested rollback instant time has been 
assigned, check for an active heartbeat for the rollback instant time. If there 
is one, then abort the rollback as that means there is a concurrent job 
executing that rollback. If not, then start a heartbeat for that rollback 
instant time.
 # Release the table lock
 # Execute the rollback plan and complete the rollback instant. Regardless of 
whether this succeeds or fails with an exception, close the heartbeat. This 
increases the chance that the next job that tries to call this rollback API 
will follow through with the rollback and not abort due to an active previous 
heartbeat

 
 * These steps will only be enforced for  skipLocking=false, since if  
skipLocking=true then that means the caller may already be explicitly holding a 
table lock. In this case, acquiring the lock again in step (1) will fail.
 * Acquiring a lock and reloading timeline for (1-3) will guard against data 
race conditions where another job calls this rollback API at same time and 
schedules its own rollback plan and instant. This is since if no rollback has 
been attempted before for this instant, then before step (1), there is a window 
of time where another concurrent rollback job could have scheduled a rollback 
plan, failed execution, and cleaned up heartbeat, all while the current 
rollback job is running. As a result, even if the current job was passed in an 
empty pending rollback plan, it still needs to check the active timeline to 
ensure that no new rollback pending instant has been created. 
 * Using a heartbeat will signal to other callers in other jobs that there is 
another job already executing this rollback. Checking for expired heartbeat and 
(re)-starting the heartbeat has to be done under a lock, so that multiple jobs 
don't each start it at the same time and assume that they are the only ones 
that are heartbeating. 
 * The table lock is no longer needed after (5), since it can now be safely 
assumed that no other job (calling this rollback API) will execute this 
rollback instant. 

One example implementation to achieve this:

 
{code:java}
@Deprecated
public boolean rollback(final String commitInstantTime, 
Option pendingRollbackInfo, boolean skipLocking,
Option rollbackInstantTimeOpt) throws HoodieRollbackException {
  final Timer.Context timerContext = this.metrics.getRollbackCtx();
  final Option commitInstantOpt;
  final HoodieTable table;
  try {
table = createTable(config, hadoopConf);
  } catch (Exception e) {
throw new HoodieRollbackException("Failed to initalize table for rollback " 
+ config.getBasePath() + " commits " + commitInstantTime, e);
  }
  final String rollbackInstantTime;
  final boolean 

[jira] [Updated] (HUDI-6596) Propose rollback implementation changes to guard against concurrent jobs

2023-08-02 Thread Krishen Bhan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-6596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Krishen Bhan updated HUDI-6596:
---
Description: 
h1. Issue

The existing rollback API in 0.14 
[https://github.com/apache/hudi/blob/master/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java#L877]
 executes a rollback plan, either taking in an existing rollback plan provided 
by the caller for a previous rollback or attempt, or scheduling a new rollback 
instant if none is provided. Currently it is not safe for two concurrent jobs 
to call this API (when skipLocking=False and the callers aren't already holding 
a lock), as this can lead to an issue where multiple rollback requested plans 
are created or two jobs are executing the same rollback instant at the same 
time.
h1. Proposed change

One way to resolve this issue is to refactor this rollback function such that 
if skipLocking=false, the following steps are followed
 # Acquire the table lock
 # Reload the active timeline
 # Look at the active timeline to see if there is a inflight rollback instant 
from a previous rollback attempt, if it exists then assign this is as the 
rollback plan to execute. Also, check if a pending rollback plan was passed in 
by caller. Then it executes the following steps depending on whether the caller 
passed a pending rollback instant plan
 ##  [a] If a pending inflight rollback plan was passed in by caller, then 
check that there is a previous attempted rollback instant on timeline (and that 
the instant times match) and continue to use this rollback plan. If that isn't 
the case, then raise a rollback exception since this means another job has 
concurrently already executed this plan. Note that in a valid HUDI dataset 
there can be at most one rollback instant for a corresponding commit instant, 
which is why if we no longer see a pending rollback in timeline in this phase 
we can safely assume that it had already been executed to completion.
 ##  [b] If no pending inflight rollback plan was passed in by caller then 
schedule a new rollback plan if no pending rollback instant was found in 
timeline earlier.
 # Now that a rollback plan and requested rollback instant time has been 
assigned, check for an active heartbeat for the rollback instant time. If there 
is one, then abort the rollback as that means there is a concurrent job 
executing that rollback. If not, then start a heartbeat for that rollback 
instant time.
 # Release the table lock
 # Execute the rollback plan and complete the rollback instant. Regardless of 
whether this succeeds or fails with an exception, close the heartbeat. This 
increases the chance that the next job that tries to call this rollback API 
will follow through with the rollback and not abort due to an active previous 
heartbeat

 
 * These steps will only be enforced for  skipLocking=false, since if  
skipLocking=true then that means the caller may already be explicitly holding a 
table lock. In this case, acquiring the lock again in step (1) will fail.
 * Acquiring a lock and reloading timeline for (1-3) will guard against data 
race conditions where another job calls this rollback API at same time and 
schedules its own rollback plan and instant. This is since if no rollback has 
been attempted before for this instant, then before step (1), there is a window 
of time where another concurrent rollback job could have scheduled a rollback 
plan, failed execution, and cleaned up heartbeat, all while the current 
rollback job is running. As a result, even if the current job was passed in an 
empty pending rollback plan, it still needs to check the active timeline to 
ensure that no new rollback pending instant has been created. 
 * Using a heartbeat will signal to other callers in other jobs that there is 
another job already executing this rollback. Checking for expired heartbeat and 
(re)-starting the heartbeat has to be done under a lock, so that multiple jobs 
don't each start it at the same time and assume that they are the only ones 
that are heartbeating. 
 * The table lock is no longer needed after (5), since it can now be safely 
assumed that no other job (calling this rollback API) will execute this 
rollback instant. 

One example implementation to achieve this:

 
{code:java}
@Deprecated
public boolean rollback(final String commitInstantTime, 
Option pendingRollbackInfo, boolean skipLocking,
Option rollbackInstantTimeOpt) throws HoodieRollbackException {
  final Timer.Context timerContext = this.metrics.getRollbackCtx();
  final Option commitInstantOpt;
  final HoodieTable table;
  try {
table = createTable(config, hadoopConf);
  } catch (Exception e) {
throw new HoodieRollbackException("Failed to initalize table for rollback " 
+ config.getBasePath() + " commits " + commitInstantTime, e);
  }
  final String rollbackInstantTime;
  final boolean 

[jira] [Updated] (HUDI-6596) Propose rollback implementation changes to guard against concurrent jobs

2023-08-02 Thread Krishen Bhan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-6596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Krishen Bhan updated HUDI-6596:
---
Description: 
h1. Issue

The existing rollback API in 0.14 
[https://github.com/apache/hudi/blob/master/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java#L877]
 executes a rollback plan, either taking in an existing rollback plan provided 
by the caller for a previous rollback or attempt, or scheduling a new rollback 
instant if none is provided. Currently it is not safe for two concurrent jobs 
to call this API (when skipLocking=False and the callers aren't already holding 
a lock), as this can lead to an issue where multiple rollback requested plans 
are created or two jobs are executing the same rollback instant at the same 
time.
h1. Proposed change

One way to resolve this issue is to refactor this rollback function such that 
if skipLocking=false, the following steps are followed
 # Acquire the table lock
 # Reload the active timeline
 # Look at the active timeline to see if there is a inflight rollback instant 
from a previous rollback attempt, if it exists then assign this is as the 
rollback plan to execute. Also, check if a pending rollback plan was passed in 
by caller. Then it executes the following steps depending on whether the caller 
passed a pending rollback instant plan
 ##  [a] If a pending inflight rollback plan was passed in by caller, then 
check that there is a previous attempted rollback instant on timeline (and that 
the instant times match) and continue to use this rollback plan. If that isn't 
the case, then raise a rollback exception since this means another job has 
concurrently already executed this plan. Note that in a valid HUDI dataset 
there can be at most one rollback instant for a corresponding commit instant, 
which is why if we no longer see a pending rollback in timeline in this phase 
we can safely assume that it had already been executed to completion.
 ##  [b] If no pending inflight rollback plan was passed in by caller then 
schedule a new rollback plan if no pending rollback instant was found in 
timeline earlier.
 # Now that a rollback plan and requested rollback instant time has been 
assigned, check for an active heartbeat for the rollback instant time. If there 
is one, then abort the rollback as that means there is a concurrent job 
executing that rollback. If not, then start a heartbeat for that rollback 
instant time.
 # Release the table lock
 # Execute the rollback plan and complete the rollback instant. Whether this 
succeeds or fails with an exception, close the heartbeat. This increases the 
chance that the next job that tries to call this rollback API will follow 
through with the rollback and not abort due to an active previous heartbeat

 
 * These steps will only be enforced for  skipLocking=false, since if  
skipLocking=true then that means the caller may already be explicitly holding a 
table lock. In this case, acquiring the lock again in step (1) will fail.
 * Acquiring a lock and reloading timeline for (1-3) will guard against data 
race conditions where another job calls this rollback API at same time and 
schedules its own rollback plan and instant. This is since if no rollback has 
been attempted before for this instant, then before step (1), there is a window 
of time where another concurrent rollback job could have scheduled a rollback 
plan, failed execution, and cleaned up heartbeat, all while the current 
rollback job is running. As a result, even if the current job was passed in an 
empty pending rollback plan, it still needs to check the active timeline to 
ensure that no new rollback pending instant has been created. 
 * Using a heartbeat will signal to other callers in other jobs that there is 
another job already executing this rollback. Checking for expired heartbeat and 
(re)-starting the heartbeat has to be done under a lock, so that multiple jobs 
don't each start it at the same time and assume that they are the only ones 
that are heartbeating. 
 * The table lock is no longer needed after (5), since it can now be safely 
assumed that no other job (calling this rollback API) will execute this 
rollback instant. 

One example implementation to achieve this:

 
{code:java}
@Deprecated
public boolean rollback(final String commitInstantTime, 
Option pendingRollbackInfo, boolean skipLocking,
Option rollbackInstantTimeOpt) throws HoodieRollbackException {
  final Timer.Context timerContext = this.metrics.getRollbackCtx();
  final Option commitInstantOpt;
  final HoodieTable table;
  try {
table = createTable(config, hadoopConf);
  } catch (Exception e) {
throw new HoodieRollbackException("Failed to initalize table for rollback " 
+ config.getBasePath() + " commits " + commitInstantTime, e);
  }
  final String rollbackInstantTime;
  final boolean deleteInstantsDuringRollback;
 

[jira] [Created] (HUDI-6596) Propose rollback implementation changes to guard against concurrent jobs

2023-07-26 Thread Krishen Bhan (Jira)
Krishen Bhan created HUDI-6596:
--

 Summary:  Propose rollback implementation changes to guard against 
concurrent jobs
 Key: HUDI-6596
 URL: https://issues.apache.org/jira/browse/HUDI-6596
 Project: Apache Hudi
  Issue Type: Wish
Reporter: Krishen Bhan


h1. Issue

The existing rollback API in 0.14 
[https://github.com/apache/hudi/blob/master/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java#L877]
 executes a rollback plan, either taking in an existing rollback plan provided 
by the caller for a previous rollback or attempt, or scheduling a new rollback 
instant if none is provided. Currently it is not safe for two concurrent jobs 
to call this API (when skipLocking=False and the callers aren't already holding 
a lock), as this can lead to an issue where multiple rollback requested plans 
are created or two jobs are executing the same rollback instant at the same 
time.

h1. Proposed change

One way to resolve this issue is to refactor this rollback function such that 
if skipLocking=false, the following steps are followed
 # Acquire the table lock
 # Reload the active timeline
 # Look at the active timeline to see if there is a inflight rollback instant 
from a previous rollback attempt, if it exists then assign this is as the 
rollback plan to execute. Also, check if a pending rollback plan was passed in 
by caller. Then makes the following checks
 ##  [a] If a pending inflight rollback plan was passed in by caller, then 
check that there is a previous attempted rollback instant on timeline (and that 
the instant times match) and use this rollback plan. If that isn't the case, 
then raise a rollback exception since this means another job has concurrently 
already executed this plan.
 ##  [b] If no pending inflight rollback plan was passed in by caller, then 
schedule a new rollback plan if an existing one wasn't found in active timeline.
 # Now that a rollback plan and requested rollback instant time has been 
assigned, check for an active heartbeat for the rollback instant time. If there 
is one, then abort the rollback as that means there is a concurrent job 
executing that rollback. If not, then start a heartbeat for that rollback 
instant time.
 # Release the table lock
 # Execute the rollback plan and complete the rollback instant. Whether this 
succeeds or fails with an exception, close the heartbeat. This increases the 
chance that the next job that tries to call this rollback API will follow 
through with the rollback and not abort due to an active previous heartbeat

 
 * These steps will only be enforced for  skipLocking=false, since if  
skipLocking=true then that means the caller may already be explicitly holding a 
table lock. In this case, acquiring the lock again in step (1) will fail.
 * Acquiring a lock and reloading timeline for (1-3) will guard against data 
race conditions where another job calls this rollback API at same time and 
schedules its own rollback plan and instant. This is since if no rollback has 
been attempted before for this instant, then before step (1), there is a window 
of time where another concurrent rollback job could have scheduled a rollback 
plan, failed execution, and cleaned up heartbeat, all while the current 
rollback job is running. As a result, even if the current job was passed in an 
empty pending rollback plan, it still needs to check the active timeline to 
ensure that no new rollback pending instant has been created. 
 * Using a heartbeat will signal to other callers in other jobs that there is 
another job already executing this rollback. Checking for expired heartbeat and 
(re)-starting the heartbeat has to be done under a lock, so that multiple jobs 
don't each start it at the same time and assume that they are the only ones 
that are heartbeating. 
 * The table lock is no longer needed after (5), since it can now be safely 
assumed that no other job (calling this rollback API) will execute this 
rollback instant. 

One example implementation to achieve this:

 
{code:java}
@Deprecated
public boolean rollback(final String commitInstantTime, 
Option pendingRollbackInfo, boolean skipLocking,
Option rollbackInstantTimeOpt) throws HoodieRollbackException {
  final Timer.Context timerContext = this.metrics.getRollbackCtx();
  final Option commitInstantOpt;
  final HoodieTable table;
  try {
table = createTable(config, hadoopConf);
  } catch (Exception e) {
throw new HoodieRollbackException("Failed to initalize table for rollback " 
+ config.getBasePath() + " commits " + commitInstantTime, e);
  }
  final String rollbackInstantTime;
  final boolean deleteInstantsDuringRollback;
  final HoodieInstant instantToRollback;
  try {
if (!skipLocking) {
  // Do step 1 and 2
  txnManager.beginTransaction();
  table.getMetaClient().reloadActiveTimeline();
}
final