[ https://issues.apache.org/jira/browse/HUDI-6596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sagar Sumit updated HUDI-6596: ------------------------------ Fix Version/s: 1.0.0 > Propose rollback implementation changes to guard against concurrent jobs > ------------------------------------------------------------------------- > > Key: HUDI-6596 > URL: https://issues.apache.org/jira/browse/HUDI-6596 > Project: Apache Hudi > Issue Type: Wish > Reporter: Krishen Bhan > Priority: Trivial > Fix For: 1.0.0 > > > h1. Issue > The existing rollback API in 0.14 > [https://github.com/apache/hudi/blob/master/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java#L877] > executes a rollback plan, either taking in an existing rollback plan > provided by the caller for a previous rollback or attempt, or scheduling a > new rollback instant if none is provided. Currently it is not safe for two > concurrent jobs to call this API (when skipLocking=False and the callers > aren't already holding a lock), as this can lead to an issue where multiple > rollback requested plans are created or two jobs are executing the same > rollback instant at the same time. > h1. Proposed change > One way to resolve this issue is to refactor this rollback function such that > if skipLocking=false, the following steps are followed > # Acquire the table lock > # Reload the active timeline > # Look at the active timeline to see if there is a inflight rollback instant > from a previous rollback attempt, if it exists then assign this is as the > rollback plan to execute. Also, check if a pending rollback plan was passed > in by caller. Then it executes the following steps depending on whether the > caller passed a pending rollback instant plan. > ## [a] If a pending inflight rollback plan was passed in by caller, then > check that there is a previous attempted rollback instant on timeline (and > that the instant times match) and continue to use this rollback plan. If that > isn't the case, then raise a rollback exception since this means another job > has concurrently already executed this plan. Note that in a valid HUDI > dataset there can be at most one rollback instant for a corresponding commit > instant, which is why if we no longer see a pending rollback in timeline in > this phase we can safely assume that it had already been executed to > completion. > ## [b] If no pending inflight rollback plan was passed in by caller and no > pending rollback instant was found in timeline earlier, then schedule a new > rollback plan > # Now that a rollback plan and requested rollback instant time has been > assigned, check for an active heartbeat for the rollback instant time. If > there is one, then abort the rollback as that means there is a concurrent job > executing that rollback. If not, then start a heartbeat for that rollback > instant time. > # Release the table lock > # Execute the rollback plan and complete the rollback instant. Regardless of > whether this succeeds or fails with an exception, close the heartbeat. This > increases the chance that the next job that tries to call this rollback API > will follow through with the rollback and not abort due to an active previous > heartbeat > > * These steps will only be enforced for skipLocking=false, since if > skipLocking=true then that means the caller may already be explicitly holding > a table lock. In this case, acquiring the lock again in step (1) will fail. > * Acquiring a lock and reloading timeline for (1-3) will guard against data > race conditions where another job calls this rollback API at same time and > schedules its own rollback plan and instant. This is since if no rollback has > been attempted before for this instant, then before step (1), there is a > window of time where another concurrent rollback job could have scheduled a > rollback plan, failed execution, and cleaned up heartbeat, all while the > current rollback job is running. As a result, even if the current job was > passed in an empty pending rollback plan, it still needs to check the active > timeline to ensure that no new rollback pending instant has been created. > * Using a heartbeat will signal to other callers in other jobs that there is > another job already executing this rollback. Checking for expired heartbeat > and (re)-starting the heartbeat has to be done under a lock, so that multiple > jobs don't each start it at the same time and assume that they are the only > ones that are heartbeating. > * The table lock is no longer needed after (5), since it can now be safely > assumed that no other job (calling this rollback API) will execute this > rollback instant. > One example implementation to achieve this: > > {code:java} > @Deprecated > public boolean rollback(final String commitInstantTime, > Option<HoodiePendingRollbackInfo> pendingRollbackInfo, boolean skipLocking, > Option<String> rollbackInstantTimeOpt) throws HoodieRollbackException { > final Timer.Context timerContext = this.metrics.getRollbackCtx(); > final Option<HoodieInstant> commitInstantOpt; > final HoodieTable<T, I, K, O> table; > try { > table = createTable(config, hadoopConf); > } catch (Exception e) { > throw new HoodieRollbackException("Failed to initalize table for rollback > " + config.getBasePath() + " commits " + commitInstantTime, e); > } > final String rollbackInstantTime; > final boolean deleteInstantsDuringRollback; > final HoodieInstant instantToRollback; > try { > if (!skipLocking) { > // Do step 1 and 2 > txnManager.beginTransaction(); > table.getMetaClient().reloadActiveTimeline(); > } > final Option<HoodiePendingRollbackInfo> previousAttemptedRollback; > if (skipLocking) { > // If skipLocking = true, then there directly use pendingRollbackInfo > without checking the status of this rollback instant on active timeline > // This is since the caller is responsible for ensuring there is no > concurrent rollback > previousAttemptedRollback = pendingRollbackInfo; > } else { > // step 3 > // If skipLocking = false, we need to check the timeline for the latest > pending rollback, in case a concurrent rollback before > // step 1 has already executed pendingRollbackInfo > previousAttemptedRollback = > getPendingRollbackInfo(table.getMetaClient(), commitInstantTime, false); > if (pendingRollbackInfo.isPresent()) { > // step 3a If a pendingRollbackInfo was passed in, verify that it is > the same as the pending rollback that was just observed. If not, then > // abort the rollback > previousAttemptedRollback.orElseThrow( > () -> new HoodieRollbackException( > String.format("Pending rollback instant %s no longer > inflight", pendingRollbackInfo.get().getRollbackInstant().getTimestamp()) > ) > ); > // This will only fail if the table is in an illegal state, where > there are 2+ rollback plans for one instant. This > // check shouldn't be necessary, but just keeping it here for now to > demonstrate > > ValidationUtils.checkArgument(previousAttemptedRollback.get().getRollbackInstant().getTimestamp().equals( > pendingRollbackInfo.get().getRollbackInstant().getTimestamp()) > ); > } > } > rollbackInstantTime = previousAttemptedRollback > .map(pendingRollback -> > pendingRollback.getRollbackInstant().getTimestamp()) > .orElse(rollbackInstantTimeOpt.orElseGet(() -> > HoodieActiveTimeline.createNewInstantTime())); > commitInstantOpt = > Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstants() > .filter(instant -> > HoodieActiveTimeline.EQUALS.test(instant.getTimestamp(), commitInstantTime)) > .findFirst()); > LOG.info(String.format("Begin rollback of instant %s at instantTime %s", > commitInstantTime, rollbackInstantTime)); > LOG.info(String.format("Scheduling Rollback at instant time : %s " > + "(exists in active timeline: %s), with rollback plan: %s", > rollbackInstantTime, commitInstantOpt.isPresent(), > previousAttemptedRollback.isPresent())); > if (previousAttemptedRollback.isPresent()) { > if (commitInstantOpt.isPresent()) { > instantToRollback = commitInstantOpt.get(); > deleteInstantsDuringRollback = true; > } else { > // A previous pending rollback plan still needs to be executed and > completed even if the instant to rollback > // is no longer in active timeline. This can be safely done by > re-creating the instant to rollback and > // configuring the rollback execution later on to not delete the > instants during rollback. > instantToRollback = new HoodieInstant( > true, > previousAttemptedRollback.get().getRollbackPlan().getInstantToRollback().getAction(), > commitInstantTime); > deleteInstantsDuringRollback = false; > } > } else { > // Step 3b > // A new rollback can only be scheduled if the commit to rollback is > still in the active timeline > if (!commitInstantOpt.isPresent()) { > LOG.warn("Cannot find instant " + commitInstantTime + " in the > timeline, for rollback"); > return false; > } > instantToRollback = commitInstantOpt.get(); > deleteInstantsDuringRollback = true; > Option<HoodieRollbackPlan> newRollbackPlanOption = > table.scheduleRollback(context, rollbackInstantTime, > commitInstantOpt.get(), false, config.shouldRollbackUsingMarkers()); > newRollbackPlanOption.orElseThrow(() -> new HoodieRollbackException( > String.format("Failed to schedule rollback of %s at instant time > %s", commitInstantTime, rollbackInstantTime)) > ); > } > // Step 4 > // This heartbeating logic should/will only be triggered if skipLocking = > false. If > // the rollback instant time has just been newly scheduled these > heartbeat checks will still correctly > // show the (non-existent) heartbeat as expired > if (!skipLocking) { > try { > if (heartbeatClient.isHeartbeatExpired(rollbackInstantTime)) { > heartbeatClient.stop(rollbackInstantTime); > } else { > throw new HoodieRollbackException(String.format("Cannot execute > rollback instant %s due to active heartbeat", rollbackInstantTime); > } > heartbeatClient.start(rollbackInstantTime); > } catch (IOException e) { > throw new HoodieRollbackException(String.format("Could not access > last heartbeat for %s", rollbackInstantTime); > } > } > } catch (Exception e) { > throw new HoodieRollbackException("Failed to use/create rollback plan > for" + config.getBasePath() + " commits " + commitInstantTime, e); > } finally { > // Step 5 > if (!skipLocking) { > txnManager.endTransaction(); > } > } > // Step 6 > try { > HoodieRollbackMetadata rollbackMetadata = table.rollback(context, > rollbackInstantTime, instantToRollback, deleteInstantsDuringRollback, > skipLocking); > if (timerContext != null) { > long durationInMs = metrics.getDurationInMs(timerContext.stop()); > metrics.updateRollbackMetrics(durationInMs, > rollbackMetadata.getTotalFilesDeleted()); > } > return true; > } catch (Exception e) { > throw new HoodieRollbackException("Failed to execute rollback " + > config.getBasePath() + " commits " + commitInstantTime, e); > } finally { > if (!skipLocking) { > heartbeatClient.stop(rollbackInstantTime); > } > } > }{code} > > > h2. Why might this change be useful? > Although these scenarios can be resolved at the application/orchestration > level rather than HUDI, we are still working on this fix in our internal > deployment of HUDI since we want to avoid edge cases where 2+ jobs can call > this rollback API for the same instant at the same time. > -- This message was sent by Atlassian Jira (v8.20.10#820010)