[ 
https://issues.apache.org/jira/browse/HUDI-6596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Krishen Bhan updated HUDI-6596:
-------------------------------
    Description: 
h1. Issue

The existing rollback API in 0.14 
[https://github.com/apache/hudi/blob/master/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java#L877]
 executes a rollback plan, either taking in an existing rollback plan provided 
by the caller for a previous rollback or attempt, or scheduling a new rollback 
instant if none is provided. Currently it is not safe for two concurrent jobs 
to call this API (when skipLocking=False and the callers aren't already holding 
a lock), as this can lead to an issue where multiple rollback requested plans 
are created or two jobs are executing the same rollback instant at the same 
time.
h1. Proposed change

One way to resolve this issue is to refactor this rollback function such that 
if skipLocking=false, the following steps are followed
 # Acquire the table lock
 # Reload the active timeline
 # Look at the active timeline to see if there is a inflight rollback instant 
from a previous rollback attempt, if it exists then assign this is as the 
rollback plan to execute. Also, check if a pending rollback plan was passed in 
by caller. Then it executes the following steps depending on whether the caller 
passed a pending rollback instant plan
 ##  [a] If a pending inflight rollback plan was passed in by caller, then 
check that there is a previous attempted rollback instant on timeline (and that 
the instant times match) and continue to use this rollback plan. If that isn't 
the case, then raise a rollback exception since this means another job has 
concurrently already executed this plan. Note that in a valid HUDI dataset 
there can be at most one rollback instant for a corresponding commit instant, 
which is why if we no longer see a pending rollback in timeline in this phase 
we can safely assume that it had already been executed to completion.
 ##  [b] If no pending inflight rollback plan was passed in by caller then 
schedule a new rollback plan if no pending rollback instant was found in 
timeline earlier.
 # Now that a rollback plan and requested rollback instant time has been 
assigned, check for an active heartbeat for the rollback instant time. If there 
is one, then abort the rollback as that means there is a concurrent job 
executing that rollback. If not, then start a heartbeat for that rollback 
instant time.
 # Release the table lock
 # Execute the rollback plan and complete the rollback instant. Regardless of 
whether this succeeds or fails with an exception, close the heartbeat. This 
increases the chance that the next job that tries to call this rollback API 
will follow through with the rollback and not abort due to an active previous 
heartbeat

 
 * These steps will only be enforced for  skipLocking=false, since if  
skipLocking=true then that means the caller may already be explicitly holding a 
table lock. In this case, acquiring the lock again in step (1) will fail.
 * Acquiring a lock and reloading timeline for (1-3) will guard against data 
race conditions where another job calls this rollback API at same time and 
schedules its own rollback plan and instant. This is since if no rollback has 
been attempted before for this instant, then before step (1), there is a window 
of time where another concurrent rollback job could have scheduled a rollback 
plan, failed execution, and cleaned up heartbeat, all while the current 
rollback job is running. As a result, even if the current job was passed in an 
empty pending rollback plan, it still needs to check the active timeline to 
ensure that no new rollback pending instant has been created. 
 * Using a heartbeat will signal to other callers in other jobs that there is 
another job already executing this rollback. Checking for expired heartbeat and 
(re)-starting the heartbeat has to be done under a lock, so that multiple jobs 
don't each start it at the same time and assume that they are the only ones 
that are heartbeating. 
 * The table lock is no longer needed after (5), since it can now be safely 
assumed that no other job (calling this rollback API) will execute this 
rollback instant. 

One example implementation to achieve this:

 
{code:java}
@Deprecated
public boolean rollback(final String commitInstantTime, 
Option<HoodiePendingRollbackInfo> pendingRollbackInfo, boolean skipLocking,
    Option<String> rollbackInstantTimeOpt) throws HoodieRollbackException {
  final Timer.Context timerContext = this.metrics.getRollbackCtx();
  final Option<HoodieInstant> commitInstantOpt;
  final HoodieTable<T, I, K, O> table;
  try {
    table = createTable(config, hadoopConf);
  } catch (Exception e) {
    throw new HoodieRollbackException("Failed to initalize table for rollback " 
+ config.getBasePath() + " commits " + commitInstantTime, e);
  }
  final String rollbackInstantTime;
  final boolean deleteInstantsDuringRollback;
  final HoodieInstant instantToRollback;
  try {
    if (!skipLocking) {
      // Do step 1 and 2
      txnManager.beginTransaction();
      table.getMetaClient().reloadActiveTimeline();
    }
    final Option<HoodiePendingRollbackInfo> previousAttemptedRollback;
    if (skipLocking) {
      // If skipLocking = true, then there directly use pendingRollbackInfo 
without checking the status of this rollback instant on active timeline
      // This is since the caller is responsible for ensuring there is no 
concurrent rollback
      previousAttemptedRollback = pendingRollbackInfo;
    } else {
      // step 3
      // If skipLocking = false, we need to check the timeline for the latest 
pending rollback, in case a concurrent rollback before
      // step 1 has already executed pendingRollbackInfo
      previousAttemptedRollback = getPendingRollbackInfo(table.getMetaClient(), 
commitInstantTime, false);
      if (pendingRollbackInfo.isPresent()) {
        // step 3a If a pendingRollbackInfo was passed in, verify that it is 
the same as the pending rollback that was just observed. If not, then
        // abort the rollback
        previousAttemptedRollback.orElseThrow(
            () -> new HoodieRollbackException(
                String.format("Pending rollback instant %s no longer inflight", 
pendingRollbackInfo.get().getRollbackInstant().getTimestamp())
            )
        );
        // This will only fail if the table is in an illegal state, where there 
are 2+ rollback plans for one instant. This
        // check shouldn't be necessary, but just keeping it here for now to 
demonstrate
        
ValidationUtils.checkArgument(previousAttemptedRollback.get().getRollbackInstant().getTimestamp().equals(
            pendingRollbackInfo.get().getRollbackInstant().getTimestamp())
        );
      }
    }
    rollbackInstantTime = previousAttemptedRollback
        .map(pendingRollback -> 
pendingRollback.getRollbackInstant().getTimestamp())
        .orElse(rollbackInstantTimeOpt.orElseGet(() -> 
HoodieActiveTimeline.createNewInstantTime()));
    commitInstantOpt = 
Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstants()
        .filter(instant -> 
HoodieActiveTimeline.EQUALS.test(instant.getTimestamp(), commitInstantTime))
        .findFirst());
    LOG.info(String.format("Begin rollback of instant %s at instantTime %s", 
commitInstantTime, rollbackInstantTime));
    LOG.info(String.format("Scheduling Rollback at instant time : %s "
            + "(exists in active timeline: %s), with rollback plan: %s",
        rollbackInstantTime, commitInstantOpt.isPresent(), 
previousAttemptedRollback.isPresent()));
    if (previousAttemptedRollback.isPresent()) {
      if (commitInstantOpt.isPresent()) {
        instantToRollback = commitInstantOpt.get();
        deleteInstantsDuringRollback = true;
      } else {
        // A previous pending rollback plan still needs to be executed and 
completed even if the instant to rollback
        // is no longer in active timeline. This can be safely done by 
re-creating the instant to rollback and
        // configuring the rollback execution later on to not delete the 
instants during rollback.
        instantToRollback = new HoodieInstant(
            true, 
previousAttemptedRollback.get().getRollbackPlan().getInstantToRollback().getAction(),
 commitInstantTime);
        deleteInstantsDuringRollback = false;
      }
    } else {
      // Step 3b
      // A new rollback can only be scheduled if the commit to rollback is 
still in the active timeline
      if (!commitInstantOpt.isPresent()) {
        LOG.warn("Cannot find instant " + commitInstantTime + " in the 
timeline, for rollback");
        return false;
      }
      instantToRollback = commitInstantOpt.get();
      deleteInstantsDuringRollback = true;
      Option<HoodieRollbackPlan> newRollbackPlanOption =
          table.scheduleRollback(context, rollbackInstantTime, 
commitInstantOpt.get(), false, config.shouldRollbackUsingMarkers());
      newRollbackPlanOption.orElseThrow(() -> new HoodieRollbackException(
          String.format("Failed to schedule rollback of %s at instant time %s", 
commitInstantTime, rollbackInstantTime))
      );
    }
    // Step 4
    // This heartbeating logic should/will only be triggered if skipLocking = 
false. If
    // the  rollback instant time has just been newly scheduled these heartbeat 
checks will still correctly
    // show the (non-existent) heartbeat as expired
    if (!skipLocking) {
      try {
        if (heartbeatClient.isHeartbeatExpired(rollbackInstantTime)) {
          heartbeatClient.stop(rollbackInstantTime);
        } else {
          throw new HoodieRollbackException(String.format("Cannot execute 
rollback instant %s due to active heartbeat", rollbackInstantTime);
        }
        heartbeatClient.start(rollbackInstantTime);
      } catch (IOException e) {
        throw new HoodieRollbackException(String.format("Could not access last 
heartbeat for %s", rollbackInstantTime);
      }
    }
  } catch (Exception e) {
    throw new HoodieRollbackException("Failed to use/create rollback plan for" 
+ config.getBasePath() + " commits " + commitInstantTime, e);
  } finally {
    // Step 5
    if (!skipLocking) {
      txnManager.endTransaction();
    }
  }
 // Step 6
  try {
    HoodieRollbackMetadata rollbackMetadata = table.rollback(context, 
rollbackInstantTime, instantToRollback, deleteInstantsDuringRollback, 
skipLocking);
    if (timerContext != null) {
      long durationInMs = metrics.getDurationInMs(timerContext.stop());
      metrics.updateRollbackMetrics(durationInMs, 
rollbackMetadata.getTotalFilesDeleted());
    }
    return true;
  } catch (Exception e) {
    throw new HoodieRollbackException("Failed to execute rollback " + 
config.getBasePath() + " commits " + commitInstantTime, e);
  } finally {
    if (!skipLocking) {
      heartbeatClient.stop(rollbackInstantTime);
    }
  }
}{code}
 

 
h2. Why might this change be useful?

Although these scenarios can be resolved at the application/orchestration level 
rather than HUDI, we are still working on this fix in our internal deployment 
of HUDI since we want to avoid edge cases where 2+ jobs can call this rollback 
API for the same instant at the same time.

 

  was:
h1. Issue

The existing rollback API in 0.14 
[https://github.com/apache/hudi/blob/master/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java#L877]
 executes a rollback plan, either taking in an existing rollback plan provided 
by the caller for a previous rollback or attempt, or scheduling a new rollback 
instant if none is provided. Currently it is not safe for two concurrent jobs 
to call this API (when skipLocking=False and the callers aren't already holding 
a lock), as this can lead to an issue where multiple rollback requested plans 
are created or two jobs are executing the same rollback instant at the same 
time.
h1. Proposed change

One way to resolve this issue is to refactor this rollback function such that 
if skipLocking=false, the following steps are followed
 # Acquire the table lock
 # Reload the active timeline
 # Look at the active timeline to see if there is a inflight rollback instant 
from a previous rollback attempt, if it exists then assign this is as the 
rollback plan to execute. Also, check if a pending rollback plan was passed in 
by caller. Then it executes the following steps depending on whether the caller 
passed a pending rollback instant plan
 ##  [a] If a pending inflight rollback plan was passed in by caller, then 
check that there is a previous attempted rollback instant on timeline (and that 
the instant times match) and continue to use this rollback plan. If that isn't 
the case, then raise a rollback exception since this means another job has 
concurrently already executed this plan. Note that in a valid HUDI dataset 
there can be at most one rollback instant for a corresponding commit instant, 
which is why if we no longer see a pending rollback in timeline in this phase 
we can safely assume that it had already been executed to completion.
 ##  [b] If no pending inflight rollback plan was passed in by caller then 
schedule a new rollback plan if no pending rollback instant was found in 
timeline earlier.
 # Now that a rollback plan and requested rollback instant time has been 
assigned, check for an active heartbeat for the rollback instant time. If there 
is one, then abort the rollback as that means there is a concurrent job 
executing that rollback. If not, then start a heartbeat for that rollback 
instant time.
 # Release the table lock
 # Execute the rollback plan and complete the rollback instant. Whether this 
succeeds or fails with an exception, close the heartbeat. This increases the 
chance that the next job that tries to call this rollback API will follow 
through with the rollback and not abort due to an active previous heartbeat

 
 * These steps will only be enforced for  skipLocking=false, since if  
skipLocking=true then that means the caller may already be explicitly holding a 
table lock. In this case, acquiring the lock again in step (1) will fail.
 * Acquiring a lock and reloading timeline for (1-3) will guard against data 
race conditions where another job calls this rollback API at same time and 
schedules its own rollback plan and instant. This is since if no rollback has 
been attempted before for this instant, then before step (1), there is a window 
of time where another concurrent rollback job could have scheduled a rollback 
plan, failed execution, and cleaned up heartbeat, all while the current 
rollback job is running. As a result, even if the current job was passed in an 
empty pending rollback plan, it still needs to check the active timeline to 
ensure that no new rollback pending instant has been created. 
 * Using a heartbeat will signal to other callers in other jobs that there is 
another job already executing this rollback. Checking for expired heartbeat and 
(re)-starting the heartbeat has to be done under a lock, so that multiple jobs 
don't each start it at the same time and assume that they are the only ones 
that are heartbeating. 
 * The table lock is no longer needed after (5), since it can now be safely 
assumed that no other job (calling this rollback API) will execute this 
rollback instant. 

One example implementation to achieve this:

 
{code:java}
@Deprecated
public boolean rollback(final String commitInstantTime, 
Option<HoodiePendingRollbackInfo> pendingRollbackInfo, boolean skipLocking,
    Option<String> rollbackInstantTimeOpt) throws HoodieRollbackException {
  final Timer.Context timerContext = this.metrics.getRollbackCtx();
  final Option<HoodieInstant> commitInstantOpt;
  final HoodieTable<T, I, K, O> table;
  try {
    table = createTable(config, hadoopConf);
  } catch (Exception e) {
    throw new HoodieRollbackException("Failed to initalize table for rollback " 
+ config.getBasePath() + " commits " + commitInstantTime, e);
  }
  final String rollbackInstantTime;
  final boolean deleteInstantsDuringRollback;
  final HoodieInstant instantToRollback;
  try {
    if (!skipLocking) {
      // Do step 1 and 2
      txnManager.beginTransaction();
      table.getMetaClient().reloadActiveTimeline();
    }
    final Option<HoodiePendingRollbackInfo> previousAttemptedRollback;
    if (skipLocking) {
      // If skipLocking = true, then there directly use pendingRollbackInfo 
without checking the status of this rollback instant on active timeline
      // This is since the caller is responsible for ensuring there is no 
concurrent rollback
      previousAttemptedRollback = pendingRollbackInfo;
    } else {
      // step 3
      // If skipLocking = false, we need to check the timeline for the latest 
pending rollback, in case a concurrent rollback before
      // step 1 has already executed pendingRollbackInfo
      previousAttemptedRollback = getPendingRollbackInfo(table.getMetaClient(), 
commitInstantTime, false);
      if (pendingRollbackInfo.isPresent()) {
        // step 3a If a pendingRollbackInfo was passed in, verify that it is 
the same as the pending rollback that was just observed. If not, then
        // abort the rollback
        previousAttemptedRollback.orElseThrow(
            () -> new HoodieRollbackException(
                String.format("Pending rollback instant %s no longer inflight", 
pendingRollbackInfo.get().getRollbackInstant().getTimestamp())
            )
        );
        // This will only fail if the table is in an illegal state, where there 
are 2+ rollback plans for one instant. This
        // check shouldn't be necessary, but just keeping it here for now to 
demonstrate
        
ValidationUtils.checkArgument(previousAttemptedRollback.get().getRollbackInstant().getTimestamp().equals(
            pendingRollbackInfo.get().getRollbackInstant().getTimestamp())
        );
      }
    }
    rollbackInstantTime = previousAttemptedRollback
        .map(pendingRollback -> 
pendingRollback.getRollbackInstant().getTimestamp())
        .orElse(rollbackInstantTimeOpt.orElseGet(() -> 
HoodieActiveTimeline.createNewInstantTime()));
    commitInstantOpt = 
Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstants()
        .filter(instant -> 
HoodieActiveTimeline.EQUALS.test(instant.getTimestamp(), commitInstantTime))
        .findFirst());
    LOG.info(String.format("Begin rollback of instant %s at instantTime %s", 
commitInstantTime, rollbackInstantTime));
    LOG.info(String.format("Scheduling Rollback at instant time : %s "
            + "(exists in active timeline: %s), with rollback plan: %s",
        rollbackInstantTime, commitInstantOpt.isPresent(), 
previousAttemptedRollback.isPresent()));
    if (previousAttemptedRollback.isPresent()) {
      if (commitInstantOpt.isPresent()) {
        instantToRollback = commitInstantOpt.get();
        deleteInstantsDuringRollback = true;
      } else {
        // A previous pending rollback plan still needs to be executed and 
completed even if the instant to rollback
        // is no longer in active timeline. This can be safely done by 
re-creating the instant to rollback and
        // configuring the rollback execution later on to not delete the 
instants during rollback.
        instantToRollback = new HoodieInstant(
            true, 
previousAttemptedRollback.get().getRollbackPlan().getInstantToRollback().getAction(),
 commitInstantTime);
        deleteInstantsDuringRollback = false;
      }
    } else {
      // Step 3b
      // A new rollback can only be scheduled if the commit to rollback is 
still in the active timeline
      if (!commitInstantOpt.isPresent()) {
        LOG.warn("Cannot find instant " + commitInstantTime + " in the 
timeline, for rollback");
        return false;
      }
      instantToRollback = commitInstantOpt.get();
      deleteInstantsDuringRollback = true;
      Option<HoodieRollbackPlan> newRollbackPlanOption =
          table.scheduleRollback(context, rollbackInstantTime, 
commitInstantOpt.get(), false, config.shouldRollbackUsingMarkers());
      newRollbackPlanOption.orElseThrow(() -> new HoodieRollbackException(
          String.format("Failed to schedule rollback of %s at instant time %s", 
commitInstantTime, rollbackInstantTime))
      );
    }
    // Step 4
    // This heartbeating logic should/will only be triggered if skipLocking = 
false. If
    // the  rollback instant time has just been newly scheduled these heartbeat 
checks will still correctly
    // show the (non-existent) heartbeat as expired
    if (!skipLocking) {
      try {
        if (heartbeatClient.isHeartbeatExpired(rollbackInstantTime)) {
          heartbeatClient.stop(rollbackInstantTime);
        } else {
          throw new HoodieRollbackException(String.format("Cannot execute 
rollback instant %s due to active heartbeat", rollbackInstantTime);
        }
        heartbeatClient.start(rollbackInstantTime);
      } catch (IOException e) {
        throw new HoodieRollbackException(String.format("Could not access last 
heartbeat for %s", rollbackInstantTime);
      }
    }
  } catch (Exception e) {
    throw new HoodieRollbackException("Failed to use/create rollback plan for" 
+ config.getBasePath() + " commits " + commitInstantTime, e);
  } finally {
    // Step 5
    if (!skipLocking) {
      txnManager.endTransaction();
    }
  }
 // Step 6
  try {
    HoodieRollbackMetadata rollbackMetadata = table.rollback(context, 
rollbackInstantTime, instantToRollback, deleteInstantsDuringRollback, 
skipLocking);
    if (timerContext != null) {
      long durationInMs = metrics.getDurationInMs(timerContext.stop());
      metrics.updateRollbackMetrics(durationInMs, 
rollbackMetadata.getTotalFilesDeleted());
    }
    return true;
  } catch (Exception e) {
    throw new HoodieRollbackException("Failed to execute rollback " + 
config.getBasePath() + " commits " + commitInstantTime, e);
  } finally {
    if (!skipLocking) {
      heartbeatClient.stop(rollbackInstantTime);
    }
  }
}{code}
 

 
h2. Why might this change be useful?

Although these scenarios can be resolved at the application/orchestration level 
rather than HUDI, we are still working on this fix in our internal deployment 
of HUDI since we want to avoid edge cases where 2+ jobs can call this rollback 
API for the same instant at the same time.

 


>  Propose rollback implementation changes to guard against concurrent jobs
> -------------------------------------------------------------------------
>
>                 Key: HUDI-6596
>                 URL: https://issues.apache.org/jira/browse/HUDI-6596
>             Project: Apache Hudi
>          Issue Type: Wish
>            Reporter: Krishen Bhan
>            Priority: Trivial
>
> h1. Issue
> The existing rollback API in 0.14 
> [https://github.com/apache/hudi/blob/master/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java#L877]
>  executes a rollback plan, either taking in an existing rollback plan 
> provided by the caller for a previous rollback or attempt, or scheduling a 
> new rollback instant if none is provided. Currently it is not safe for two 
> concurrent jobs to call this API (when skipLocking=False and the callers 
> aren't already holding a lock), as this can lead to an issue where multiple 
> rollback requested plans are created or two jobs are executing the same 
> rollback instant at the same time.
> h1. Proposed change
> One way to resolve this issue is to refactor this rollback function such that 
> if skipLocking=false, the following steps are followed
>  # Acquire the table lock
>  # Reload the active timeline
>  # Look at the active timeline to see if there is a inflight rollback instant 
> from a previous rollback attempt, if it exists then assign this is as the 
> rollback plan to execute. Also, check if a pending rollback plan was passed 
> in by caller. Then it executes the following steps depending on whether the 
> caller passed a pending rollback instant plan
>  ##  [a] If a pending inflight rollback plan was passed in by caller, then 
> check that there is a previous attempted rollback instant on timeline (and 
> that the instant times match) and continue to use this rollback plan. If that 
> isn't the case, then raise a rollback exception since this means another job 
> has concurrently already executed this plan. Note that in a valid HUDI 
> dataset there can be at most one rollback instant for a corresponding commit 
> instant, which is why if we no longer see a pending rollback in timeline in 
> this phase we can safely assume that it had already been executed to 
> completion.
>  ##  [b] If no pending inflight rollback plan was passed in by caller then 
> schedule a new rollback plan if no pending rollback instant was found in 
> timeline earlier.
>  # Now that a rollback plan and requested rollback instant time has been 
> assigned, check for an active heartbeat for the rollback instant time. If 
> there is one, then abort the rollback as that means there is a concurrent job 
> executing that rollback. If not, then start a heartbeat for that rollback 
> instant time.
>  # Release the table lock
>  # Execute the rollback plan and complete the rollback instant. Regardless of 
> whether this succeeds or fails with an exception, close the heartbeat. This 
> increases the chance that the next job that tries to call this rollback API 
> will follow through with the rollback and not abort due to an active previous 
> heartbeat
>  
>  * These steps will only be enforced for  skipLocking=false, since if  
> skipLocking=true then that means the caller may already be explicitly holding 
> a table lock. In this case, acquiring the lock again in step (1) will fail.
>  * Acquiring a lock and reloading timeline for (1-3) will guard against data 
> race conditions where another job calls this rollback API at same time and 
> schedules its own rollback plan and instant. This is since if no rollback has 
> been attempted before for this instant, then before step (1), there is a 
> window of time where another concurrent rollback job could have scheduled a 
> rollback plan, failed execution, and cleaned up heartbeat, all while the 
> current rollback job is running. As a result, even if the current job was 
> passed in an empty pending rollback plan, it still needs to check the active 
> timeline to ensure that no new rollback pending instant has been created. 
>  * Using a heartbeat will signal to other callers in other jobs that there is 
> another job already executing this rollback. Checking for expired heartbeat 
> and (re)-starting the heartbeat has to be done under a lock, so that multiple 
> jobs don't each start it at the same time and assume that they are the only 
> ones that are heartbeating. 
>  * The table lock is no longer needed after (5), since it can now be safely 
> assumed that no other job (calling this rollback API) will execute this 
> rollback instant. 
> One example implementation to achieve this:
>  
> {code:java}
> @Deprecated
> public boolean rollback(final String commitInstantTime, 
> Option<HoodiePendingRollbackInfo> pendingRollbackInfo, boolean skipLocking,
>     Option<String> rollbackInstantTimeOpt) throws HoodieRollbackException {
>   final Timer.Context timerContext = this.metrics.getRollbackCtx();
>   final Option<HoodieInstant> commitInstantOpt;
>   final HoodieTable<T, I, K, O> table;
>   try {
>     table = createTable(config, hadoopConf);
>   } catch (Exception e) {
>     throw new HoodieRollbackException("Failed to initalize table for rollback 
> " + config.getBasePath() + " commits " + commitInstantTime, e);
>   }
>   final String rollbackInstantTime;
>   final boolean deleteInstantsDuringRollback;
>   final HoodieInstant instantToRollback;
>   try {
>     if (!skipLocking) {
>       // Do step 1 and 2
>       txnManager.beginTransaction();
>       table.getMetaClient().reloadActiveTimeline();
>     }
>     final Option<HoodiePendingRollbackInfo> previousAttemptedRollback;
>     if (skipLocking) {
>       // If skipLocking = true, then there directly use pendingRollbackInfo 
> without checking the status of this rollback instant on active timeline
>       // This is since the caller is responsible for ensuring there is no 
> concurrent rollback
>       previousAttemptedRollback = pendingRollbackInfo;
>     } else {
>       // step 3
>       // If skipLocking = false, we need to check the timeline for the latest 
> pending rollback, in case a concurrent rollback before
>       // step 1 has already executed pendingRollbackInfo
>       previousAttemptedRollback = 
> getPendingRollbackInfo(table.getMetaClient(), commitInstantTime, false);
>       if (pendingRollbackInfo.isPresent()) {
>         // step 3a If a pendingRollbackInfo was passed in, verify that it is 
> the same as the pending rollback that was just observed. If not, then
>         // abort the rollback
>         previousAttemptedRollback.orElseThrow(
>             () -> new HoodieRollbackException(
>                 String.format("Pending rollback instant %s no longer 
> inflight", pendingRollbackInfo.get().getRollbackInstant().getTimestamp())
>             )
>         );
>         // This will only fail if the table is in an illegal state, where 
> there are 2+ rollback plans for one instant. This
>         // check shouldn't be necessary, but just keeping it here for now to 
> demonstrate
>         
> ValidationUtils.checkArgument(previousAttemptedRollback.get().getRollbackInstant().getTimestamp().equals(
>             pendingRollbackInfo.get().getRollbackInstant().getTimestamp())
>         );
>       }
>     }
>     rollbackInstantTime = previousAttemptedRollback
>         .map(pendingRollback -> 
> pendingRollback.getRollbackInstant().getTimestamp())
>         .orElse(rollbackInstantTimeOpt.orElseGet(() -> 
> HoodieActiveTimeline.createNewInstantTime()));
>     commitInstantOpt = 
> Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstants()
>         .filter(instant -> 
> HoodieActiveTimeline.EQUALS.test(instant.getTimestamp(), commitInstantTime))
>         .findFirst());
>     LOG.info(String.format("Begin rollback of instant %s at instantTime %s", 
> commitInstantTime, rollbackInstantTime));
>     LOG.info(String.format("Scheduling Rollback at instant time : %s "
>             + "(exists in active timeline: %s), with rollback plan: %s",
>         rollbackInstantTime, commitInstantOpt.isPresent(), 
> previousAttemptedRollback.isPresent()));
>     if (previousAttemptedRollback.isPresent()) {
>       if (commitInstantOpt.isPresent()) {
>         instantToRollback = commitInstantOpt.get();
>         deleteInstantsDuringRollback = true;
>       } else {
>         // A previous pending rollback plan still needs to be executed and 
> completed even if the instant to rollback
>         // is no longer in active timeline. This can be safely done by 
> re-creating the instant to rollback and
>         // configuring the rollback execution later on to not delete the 
> instants during rollback.
>         instantToRollback = new HoodieInstant(
>             true, 
> previousAttemptedRollback.get().getRollbackPlan().getInstantToRollback().getAction(),
>  commitInstantTime);
>         deleteInstantsDuringRollback = false;
>       }
>     } else {
>       // Step 3b
>       // A new rollback can only be scheduled if the commit to rollback is 
> still in the active timeline
>       if (!commitInstantOpt.isPresent()) {
>         LOG.warn("Cannot find instant " + commitInstantTime + " in the 
> timeline, for rollback");
>         return false;
>       }
>       instantToRollback = commitInstantOpt.get();
>       deleteInstantsDuringRollback = true;
>       Option<HoodieRollbackPlan> newRollbackPlanOption =
>           table.scheduleRollback(context, rollbackInstantTime, 
> commitInstantOpt.get(), false, config.shouldRollbackUsingMarkers());
>       newRollbackPlanOption.orElseThrow(() -> new HoodieRollbackException(
>           String.format("Failed to schedule rollback of %s at instant time 
> %s", commitInstantTime, rollbackInstantTime))
>       );
>     }
>     // Step 4
>     // This heartbeating logic should/will only be triggered if skipLocking = 
> false. If
>     // the  rollback instant time has just been newly scheduled these 
> heartbeat checks will still correctly
>     // show the (non-existent) heartbeat as expired
>     if (!skipLocking) {
>       try {
>         if (heartbeatClient.isHeartbeatExpired(rollbackInstantTime)) {
>           heartbeatClient.stop(rollbackInstantTime);
>         } else {
>           throw new HoodieRollbackException(String.format("Cannot execute 
> rollback instant %s due to active heartbeat", rollbackInstantTime);
>         }
>         heartbeatClient.start(rollbackInstantTime);
>       } catch (IOException e) {
>         throw new HoodieRollbackException(String.format("Could not access 
> last heartbeat for %s", rollbackInstantTime);
>       }
>     }
>   } catch (Exception e) {
>     throw new HoodieRollbackException("Failed to use/create rollback plan 
> for" + config.getBasePath() + " commits " + commitInstantTime, e);
>   } finally {
>     // Step 5
>     if (!skipLocking) {
>       txnManager.endTransaction();
>     }
>   }
>  // Step 6
>   try {
>     HoodieRollbackMetadata rollbackMetadata = table.rollback(context, 
> rollbackInstantTime, instantToRollback, deleteInstantsDuringRollback, 
> skipLocking);
>     if (timerContext != null) {
>       long durationInMs = metrics.getDurationInMs(timerContext.stop());
>       metrics.updateRollbackMetrics(durationInMs, 
> rollbackMetadata.getTotalFilesDeleted());
>     }
>     return true;
>   } catch (Exception e) {
>     throw new HoodieRollbackException("Failed to execute rollback " + 
> config.getBasePath() + " commits " + commitInstantTime, e);
>   } finally {
>     if (!skipLocking) {
>       heartbeatClient.stop(rollbackInstantTime);
>     }
>   }
> }{code}
>  
>  
> h2. Why might this change be useful?
> Although these scenarios can be resolved at the application/orchestration 
> level rather than HUDI, we are still working on this fix in our internal 
> deployment of HUDI since we want to avoid edge cases where 2+ jobs can call 
> this rollback API for the same instant at the same time.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to