[ 
https://issues.apache.org/jira/browse/HUDI-6596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Sumit updated HUDI-6596:
------------------------------
    Fix Version/s: 1.0.0

>  Propose rollback implementation changes to guard against concurrent jobs
> -------------------------------------------------------------------------
>
>                 Key: HUDI-6596
>                 URL: https://issues.apache.org/jira/browse/HUDI-6596
>             Project: Apache Hudi
>          Issue Type: Wish
>            Reporter: Krishen Bhan
>            Priority: Trivial
>             Fix For: 1.0.0
>
>
> h1. Issue
> The existing rollback API in 0.14 
> [https://github.com/apache/hudi/blob/master/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java#L877]
>  executes a rollback plan, either taking in an existing rollback plan 
> provided by the caller for a previous rollback or attempt, or scheduling a 
> new rollback instant if none is provided. Currently it is not safe for two 
> concurrent jobs to call this API (when skipLocking=False and the callers 
> aren't already holding a lock), as this can lead to an issue where multiple 
> rollback requested plans are created or two jobs are executing the same 
> rollback instant at the same time.
> h1. Proposed change
> One way to resolve this issue is to refactor this rollback function such that 
> if skipLocking=false, the following steps are followed
>  # Acquire the table lock
>  # Reload the active timeline
>  # Look at the active timeline to see if there is a inflight rollback instant 
> from a previous rollback attempt, if it exists then assign this is as the 
> rollback plan to execute. Also, check if a pending rollback plan was passed 
> in by caller. Then it executes the following steps depending on whether the 
> caller passed a pending rollback instant plan.
>  ##  [a] If a pending inflight rollback plan was passed in by caller, then 
> check that there is a previous attempted rollback instant on timeline (and 
> that the instant times match) and continue to use this rollback plan. If that 
> isn't the case, then raise a rollback exception since this means another job 
> has concurrently already executed this plan. Note that in a valid HUDI 
> dataset there can be at most one rollback instant for a corresponding commit 
> instant, which is why if we no longer see a pending rollback in timeline in 
> this phase we can safely assume that it had already been executed to 
> completion.
>  ##  [b] If no pending inflight rollback plan was passed in by caller and no 
> pending rollback instant was found in timeline earlier, then schedule a new 
> rollback plan
>  # Now that a rollback plan and requested rollback instant time has been 
> assigned, check for an active heartbeat for the rollback instant time. If 
> there is one, then abort the rollback as that means there is a concurrent job 
> executing that rollback. If not, then start a heartbeat for that rollback 
> instant time.
>  # Release the table lock
>  # Execute the rollback plan and complete the rollback instant. Regardless of 
> whether this succeeds or fails with an exception, close the heartbeat. This 
> increases the chance that the next job that tries to call this rollback API 
> will follow through with the rollback and not abort due to an active previous 
> heartbeat
>  
>  * These steps will only be enforced for  skipLocking=false, since if  
> skipLocking=true then that means the caller may already be explicitly holding 
> a table lock. In this case, acquiring the lock again in step (1) will fail.
>  * Acquiring a lock and reloading timeline for (1-3) will guard against data 
> race conditions where another job calls this rollback API at same time and 
> schedules its own rollback plan and instant. This is since if no rollback has 
> been attempted before for this instant, then before step (1), there is a 
> window of time where another concurrent rollback job could have scheduled a 
> rollback plan, failed execution, and cleaned up heartbeat, all while the 
> current rollback job is running. As a result, even if the current job was 
> passed in an empty pending rollback plan, it still needs to check the active 
> timeline to ensure that no new rollback pending instant has been created. 
>  * Using a heartbeat will signal to other callers in other jobs that there is 
> another job already executing this rollback. Checking for expired heartbeat 
> and (re)-starting the heartbeat has to be done under a lock, so that multiple 
> jobs don't each start it at the same time and assume that they are the only 
> ones that are heartbeating. 
>  * The table lock is no longer needed after (5), since it can now be safely 
> assumed that no other job (calling this rollback API) will execute this 
> rollback instant. 
> One example implementation to achieve this:
>  
> {code:java}
> @Deprecated
> public boolean rollback(final String commitInstantTime, 
> Option<HoodiePendingRollbackInfo> pendingRollbackInfo, boolean skipLocking,
>     Option<String> rollbackInstantTimeOpt) throws HoodieRollbackException {
>   final Timer.Context timerContext = this.metrics.getRollbackCtx();
>   final Option<HoodieInstant> commitInstantOpt;
>   final HoodieTable<T, I, K, O> table;
>   try {
>     table = createTable(config, hadoopConf);
>   } catch (Exception e) {
>     throw new HoodieRollbackException("Failed to initalize table for rollback 
> " + config.getBasePath() + " commits " + commitInstantTime, e);
>   }
>   final String rollbackInstantTime;
>   final boolean deleteInstantsDuringRollback;
>   final HoodieInstant instantToRollback;
>   try {
>     if (!skipLocking) {
>       // Do step 1 and 2
>       txnManager.beginTransaction();
>       table.getMetaClient().reloadActiveTimeline();
>     }
>     final Option<HoodiePendingRollbackInfo> previousAttemptedRollback;
>     if (skipLocking) {
>       // If skipLocking = true, then there directly use pendingRollbackInfo 
> without checking the status of this rollback instant on active timeline
>       // This is since the caller is responsible for ensuring there is no 
> concurrent rollback
>       previousAttemptedRollback = pendingRollbackInfo;
>     } else {
>       // step 3
>       // If skipLocking = false, we need to check the timeline for the latest 
> pending rollback, in case a concurrent rollback before
>       // step 1 has already executed pendingRollbackInfo
>       previousAttemptedRollback = 
> getPendingRollbackInfo(table.getMetaClient(), commitInstantTime, false);
>       if (pendingRollbackInfo.isPresent()) {
>         // step 3a If a pendingRollbackInfo was passed in, verify that it is 
> the same as the pending rollback that was just observed. If not, then
>         // abort the rollback
>         previousAttemptedRollback.orElseThrow(
>             () -> new HoodieRollbackException(
>                 String.format("Pending rollback instant %s no longer 
> inflight", pendingRollbackInfo.get().getRollbackInstant().getTimestamp())
>             )
>         );
>         // This will only fail if the table is in an illegal state, where 
> there are 2+ rollback plans for one instant. This
>         // check shouldn't be necessary, but just keeping it here for now to 
> demonstrate
>         
> ValidationUtils.checkArgument(previousAttemptedRollback.get().getRollbackInstant().getTimestamp().equals(
>             pendingRollbackInfo.get().getRollbackInstant().getTimestamp())
>         );
>       }
>     }
>     rollbackInstantTime = previousAttemptedRollback
>         .map(pendingRollback -> 
> pendingRollback.getRollbackInstant().getTimestamp())
>         .orElse(rollbackInstantTimeOpt.orElseGet(() -> 
> HoodieActiveTimeline.createNewInstantTime()));
>     commitInstantOpt = 
> Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstants()
>         .filter(instant -> 
> HoodieActiveTimeline.EQUALS.test(instant.getTimestamp(), commitInstantTime))
>         .findFirst());
>     LOG.info(String.format("Begin rollback of instant %s at instantTime %s", 
> commitInstantTime, rollbackInstantTime));
>     LOG.info(String.format("Scheduling Rollback at instant time : %s "
>             + "(exists in active timeline: %s), with rollback plan: %s",
>         rollbackInstantTime, commitInstantOpt.isPresent(), 
> previousAttemptedRollback.isPresent()));
>     if (previousAttemptedRollback.isPresent()) {
>       if (commitInstantOpt.isPresent()) {
>         instantToRollback = commitInstantOpt.get();
>         deleteInstantsDuringRollback = true;
>       } else {
>         // A previous pending rollback plan still needs to be executed and 
> completed even if the instant to rollback
>         // is no longer in active timeline. This can be safely done by 
> re-creating the instant to rollback and
>         // configuring the rollback execution later on to not delete the 
> instants during rollback.
>         instantToRollback = new HoodieInstant(
>             true, 
> previousAttemptedRollback.get().getRollbackPlan().getInstantToRollback().getAction(),
>  commitInstantTime);
>         deleteInstantsDuringRollback = false;
>       }
>     } else {
>       // Step 3b
>       // A new rollback can only be scheduled if the commit to rollback is 
> still in the active timeline
>       if (!commitInstantOpt.isPresent()) {
>         LOG.warn("Cannot find instant " + commitInstantTime + " in the 
> timeline, for rollback");
>         return false;
>       }
>       instantToRollback = commitInstantOpt.get();
>       deleteInstantsDuringRollback = true;
>       Option<HoodieRollbackPlan> newRollbackPlanOption =
>           table.scheduleRollback(context, rollbackInstantTime, 
> commitInstantOpt.get(), false, config.shouldRollbackUsingMarkers());
>       newRollbackPlanOption.orElseThrow(() -> new HoodieRollbackException(
>           String.format("Failed to schedule rollback of %s at instant time 
> %s", commitInstantTime, rollbackInstantTime))
>       );
>     }
>     // Step 4
>     // This heartbeating logic should/will only be triggered if skipLocking = 
> false. If
>     // the  rollback instant time has just been newly scheduled these 
> heartbeat checks will still correctly
>     // show the (non-existent) heartbeat as expired
>     if (!skipLocking) {
>       try {
>         if (heartbeatClient.isHeartbeatExpired(rollbackInstantTime)) {
>           heartbeatClient.stop(rollbackInstantTime);
>         } else {
>           throw new HoodieRollbackException(String.format("Cannot execute 
> rollback instant %s due to active heartbeat", rollbackInstantTime);
>         }
>         heartbeatClient.start(rollbackInstantTime);
>       } catch (IOException e) {
>         throw new HoodieRollbackException(String.format("Could not access 
> last heartbeat for %s", rollbackInstantTime);
>       }
>     }
>   } catch (Exception e) {
>     throw new HoodieRollbackException("Failed to use/create rollback plan 
> for" + config.getBasePath() + " commits " + commitInstantTime, e);
>   } finally {
>     // Step 5
>     if (!skipLocking) {
>       txnManager.endTransaction();
>     }
>   }
>  // Step 6
>   try {
>     HoodieRollbackMetadata rollbackMetadata = table.rollback(context, 
> rollbackInstantTime, instantToRollback, deleteInstantsDuringRollback, 
> skipLocking);
>     if (timerContext != null) {
>       long durationInMs = metrics.getDurationInMs(timerContext.stop());
>       metrics.updateRollbackMetrics(durationInMs, 
> rollbackMetadata.getTotalFilesDeleted());
>     }
>     return true;
>   } catch (Exception e) {
>     throw new HoodieRollbackException("Failed to execute rollback " + 
> config.getBasePath() + " commits " + commitInstantTime, e);
>   } finally {
>     if (!skipLocking) {
>       heartbeatClient.stop(rollbackInstantTime);
>     }
>   }
> }{code}
>  
>  
> h2. Why might this change be useful?
> Although these scenarios can be resolved at the application/orchestration 
> level rather than HUDI, we are still working on this fix in our internal 
> deployment of HUDI since we want to avoid edge cases where 2+ jobs can call 
> this rollback API for the same instant at the same time.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to