[
https://issues.apache.org/jira/browse/HADOOP-19497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17936995#comment-17936995
]
ASF GitHub Bot commented on HADOOP-19497:
-----------------------------------------
manika137 commented on code in PR #7509:
URL: https://github.com/apache/hadoop/pull/7509#discussion_r2004765573
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java:
##########
@@ -1664,4 +1528,301 @@ public String
addClientTransactionIdToHeader(List<AbfsHttpHeader> requestHeaders
}
return clientTransactionId;
}
+
+ /**
+ * Attempts to rename a path with client transaction ID (CTId) recovery
mechanism.
+ * If the initial rename attempt fails, it tries to recover using CTId or
ETag
+ * and retries the operation.
+ *
+ * @param source the source path to be renamed
+ * @param destination the destination path for the rename
+ * @param continuation the continuation token for the operation
+ * @param tracingContext the context for tracing the operation
+ * @param sourceEtag the ETag of the source path for conditional requests
+ * @param isMetadataIncompleteState flag indicating if the metadata state is
incomplete
+ * @return an {@link AbfsClientRenameResult} containing the result of the
rename operation
+ * @throws IOException if an error occurs during the rename operation
+ */
+ private AbfsClientRenameResult renameWithCTIdRecovery(String source,
+ String destination, String continuation, TracingContext tracingContext,
+ String sourceEtag, boolean isMetadataIncompleteState) throws IOException
{
+ // Get request headers for rename operation
+ List<AbfsHttpHeader> requestHeaders = getHeadersForRename(source);
+ // Add client transaction ID to the request headers
+ String clientTransactionId =
addClientTransactionIdToHeader(requestHeaders);
+
+ // Create the URL for the rename operation
+ final URL url = createRequestUrl(destination,
+ getRenameQueryBuilder(destination, continuation).toString());
+
+ // Create the rename operation
+ AbfsRestOperation op = createRenameRestOperation(url, requestHeaders);
+ try {
+ incrementAbfsRenamePath();
+ op.execute(tracingContext);
+ // AbfsClientResult contains the AbfsOperation, If recovery happened or
+ // not, and the incompleteMetaDataState is true or false.
+ // If we successfully rename a path and isMetadataIncompleteState was
+ // true, then rename was recovered, else it didn't, this is why
+ // isMetadataIncompleteState is used for renameRecovery(as the 2nd
param).
+ return new AbfsClientRenameResult(op, isMetadataIncompleteState,
+ isMetadataIncompleteState);
+ } catch (AzureBlobFileSystemException e) {
+ // Handle rename exceptions and retry if applicable
+ handleRenameException(source, destination, continuation,
+ tracingContext, sourceEtag, op, isMetadataIncompleteState, e);
+
+ // Check if the operation is a retried request and if the error code
indicates
+ // that the source path was not found. If so, attempt recovery using
CTId.
+ if (op.isARetriedRequest()
+ && SOURCE_PATH_NOT_FOUND.getErrorCode()
+ .equalsIgnoreCase(op.getResult().getStorageErrorCode())) {
+ if (recoveryUsingCTId(destination, tracingContext,
clientTransactionId)) {
+ return new AbfsClientRenameResult(
+ getSuccessOp(AbfsRestOperationType.RenamePath,
+ HTTP_METHOD_PUT, url, requestHeaders),
+ true, isMetadataIncompleteState);
+ }
+ }
+
+ // Attempt recovery using ETag if applicable
+ if (recoveryUsingEtag(source, destination, sourceEtag,
+ op, tracingContext, true)) {
+ return new AbfsClientRenameResult(
+ getSuccessOp(AbfsRestOperationType.RenamePath,
+ HTTP_METHOD_PUT, url, requestHeaders),
+ true,
+ isMetadataIncompleteState);
+ }
+ throw e;
+ }
+ }
+
+ /**
+ * Attempts to recover a rename operation using ETag. If the source ETag is
not provided, it attempts
+ * to fetch it and retry the operation. If recovery fails, it throws the
exception.
+ *
+ * @param source the source path to be renamed
+ * @param destination the destination path for the rename
+ * @param continuation the continuation token for the operation
+ * @param tracingContext the context for tracing the operation
+ * @param sourceEtag the ETag of the source path for conditional requests
+ * @param isMetadataIncompleteState flag indicating if the metadata state is
incomplete
+ * @return an {@link AbfsClientRenameResult} containing the result of the
rename operation
+ * @throws IOException if an error occurs during the rename operation or
recovery
+ */
+ private AbfsClientRenameResult renameWithETagRecovery(String source,
+ String destination, String continuation,
+ TracingContext tracingContext, String sourceEtag,
+ boolean isMetadataIncompleteState) throws IOException {
+ boolean hasEtag = !isEmpty(sourceEtag);
+ boolean shouldAttemptRecovery = isRenameResilience() &&
getIsNamespaceEnabled();
+ if (!hasEtag && shouldAttemptRecovery) {
+ // in case eTag is already not supplied to the API
+ // and rename resilience is expected and it is an HNS enabled account
+ // fetch the source etag to be used later in recovery
+ try {
+ final AbfsRestOperation srcStatusOp = getPathStatus(source,
+ false, tracingContext, null);
+ if (srcStatusOp.hasResult()) {
+ final AbfsHttpOperation result = srcStatusOp.getResult();
+ sourceEtag = extractEtagHeader(result);
+ // and update the directory status.
+ boolean isDir = checkIsDir(result);
+ shouldAttemptRecovery = !isDir;
+ LOG.debug(
+ "Retrieved etag of source for rename recovery: {}; isDir={}",
+ sourceEtag, isDir);
+ }
+ } catch (AbfsRestOperationException e) {
+ throw new AbfsRestOperationException(e.getStatusCode(),
+ SOURCE_PATH_NOT_FOUND.getErrorCode(), e.getMessage(), e);
+ }
+ }
+
+ // Get request headers for rename operation
+ List<AbfsHttpHeader> requestHeaders = getHeadersForRename(source);
+
+ // Create the URL for the rename operation
+ final URL url = createRequestUrl(destination,
+ getRenameQueryBuilder(destination, continuation).toString());
+
+ // Create the rename operation
+ AbfsRestOperation op = createRenameRestOperation(url, requestHeaders);
+ try {
+ incrementAbfsRenamePath();
+ op.execute(tracingContext);
+ // AbfsClientResult contains the AbfsOperation, If recovery happened or
+ // not, and the incompleteMetaDataState is true or false.
+ // If we successfully rename a path and isMetadataIncompleteState was
+ // true, then rename was recovered, else it didn't, this is why
+ // isMetadataIncompleteState is used for renameRecovery(as the 2nd
param).
+ return new AbfsClientRenameResult(op, isMetadataIncompleteState,
+ isMetadataIncompleteState);
+ } catch (AzureBlobFileSystemException e) {
+ // Handle rename exceptions and retry if applicable
+ handleRenameException(source, destination, continuation,
+ tracingContext, sourceEtag, op, isMetadataIncompleteState, e);
+
+ // Attempt recovery using ETag if applicable
+ if (recoveryUsingEtag(source, destination, sourceEtag,
+ op, tracingContext, shouldAttemptRecovery)) {
+ return new AbfsClientRenameResult(
+ getSuccessOp(AbfsRestOperationType.RenamePath,
+ HTTP_METHOD_PUT, url, requestHeaders),
+ true, isMetadataIncompleteState);
+ }
+ throw e;
+ }
+ }
+
+ /**
+ * Creates a list of HTTP headers required for a rename operation, including
the encoded source path
+ * and SAS token if applicable.
+ *
+ * @param source the source path for the rename operation
+ * @return a list of {@link AbfsHttpHeader} containing the headers for the
rename request
+ * @throws IOException if an error occurs while creating the headers or
encoding the source path
+ */
+ private List<AbfsHttpHeader> getHeadersForRename(final String source)
+ throws IOException {
+ final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+ String encodedRenameSource = urlEncode(
+ FORWARD_SLASH + this.getFileSystem() + source);
+
+ if (getAuthType() == AuthType.SAS) {
+ final AbfsUriQueryBuilder srcQueryBuilder = new AbfsUriQueryBuilder();
+ appendSASTokenToQuery(source,
+ SASTokenProvider.RENAME_SOURCE_OPERATION, srcQueryBuilder);
+ encodedRenameSource += srcQueryBuilder.toString();
+ }
+
+ LOG.trace("Rename source queryparam added {}", encodedRenameSource);
+ requestHeaders.add(new AbfsHttpHeader(X_MS_RENAME_SOURCE,
encodedRenameSource));
+ requestHeaders.add(new AbfsHttpHeader(IF_NONE_MATCH, STAR));
+ return requestHeaders;
+ }
+
+ /**
+ * Builds a query builder for the rename operation URL, including the
continuation token and SAS token
+ * for the destination path.
+ *
+ * @param destination the destination path for the rename operation
+ * @param continuation the continuation token for the operation
+ * @return an {@link AbfsUriQueryBuilder} containing the query parameters
for the rename operation
+ * @throws AzureBlobFileSystemException if an error occurs while appending
the SAS token
+ */
+ private AbfsUriQueryBuilder getRenameQueryBuilder(final String destination,
+ final String continuation) throws AzureBlobFileSystemException {
+ final AbfsUriQueryBuilder abfsUriQueryBuilder =
createDefaultUriQueryBuilder();
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_CONTINUATION, continuation);
+ appendSASTokenToQuery(destination,
+ SASTokenProvider.RENAME_DESTINATION_OPERATION, abfsUriQueryBuilder);
+ return abfsUriQueryBuilder;
+ }
+
+ /**
+ * Attempts to recover a rename operation using the client transaction ID
(CTId).
+ * It checks if the provided CTId matches the one in the response header for
the destination path.
+ *
+ * @param destination the destination path for the rename operation
+ * @param tracingContext the context for tracing the operation
+ * @param clientTransactionId the client transaction ID to be used for
recovery
+ * @return true if the client transaction ID matches, indicating recovery
can proceed; false otherwise
+ * @throws AzureBlobFileSystemException if an error occurs while retrieving
the path status
+ */
+ private boolean recoveryUsingCTId(String destination,
+ TracingContext tracingContext, String clientTransactionId)
+ throws AzureBlobFileSystemException {
+ try {
+ final AbfsHttpOperation abfsHttpOperation =
+ getPathStatus(destination, false,
+ tracingContext, null).getResult();
+ return clientTransactionId.equals(
+ abfsHttpOperation.getResponseHeader(X_MS_CLIENT_TRANSACTION_ID));
+ } catch (AzureBlobFileSystemException exception) {
+ throw new AbfsDriverException(ERR_RENAME_RECOVERY, exception);
+ }
+ }
+
+ /**
+ * Attempts recovery using an ETag for the given source and destination.
+ * If recovery is enabled and rename resilience is supported, performs an
idempotency check
+ * for the rename operation.
+ *
+ * @param source the source path to be renamed
+ * @param destination the destination path for the rename
+ * @param sourceEtag the ETag of the source path for conditional requests
+ * @param op the AbfsRestOperation object for the rename operation
+ * @param tracingContext the context for tracing the operation
+ * @param shouldAttemptRecovery flag indicating whether recovery should be
attempted
+ * @return true if the recovery attempt was successful, false otherwise
+ */
+ private boolean recoveryUsingEtag(String source, String destination,
+ String sourceEtag, AbfsRestOperation op, TracingContext tracingContext,
+ boolean shouldAttemptRecovery) {
+ if (shouldAttemptRecovery && isRenameResilience()) {
+ return renameIdempotencyCheckOp(source, sourceEtag,
+ op, destination, tracingContext);
+ }
+ return false;
+ }
+
+ /**
+ * Checks for rename operation exceptions and handles them accordingly.
+ * Throws an exception or retries the operation if certain error conditions
are met,
+ * such as unauthorized overwrite or missing destination parent path.
+ *
+ * @param source The source path for the rename operation.
+ * @param destination The destination path for the rename operation.
+ * @param continuation Continuation token for the operation, if applicable.
+ * @param tracingContext The tracing context for tracking the operation.
+ * @param sourceEtag The ETag of the source path for metadata validation.
+ * @param op The ABFS operation result for the rename attempt.
+ * @param isMetadataIncompleteState Flag indicating if metadata is
incomplete.
+ * @throws IOException If an I/O error occurs during the rename operation.
+ * @throws FileAlreadyExistsException If the destination file already exists
and overwrite is unauthorized.
Review Comment:
Unauthorized Blob Overwrite error we only get when the destination file
already exists. For invalid permissions we have separate errors thrown. We can
remove the "unauthorized overwrite" parts here.
> [ABFS] Enable rename and create recovery from client transaction id over DFS
> endpoint
> -------------------------------------------------------------------------------------
>
> Key: HADOOP-19497
> URL: https://issues.apache.org/jira/browse/HADOOP-19497
> Project: Hadoop Common
> Issue Type: Sub-task
> Components: fs/azure
> Affects Versions: 3.5.0
> Reporter: Manish Bhatt
> Assignee: Manish Bhatt
> Priority: Major
> Labels: pull-request-available
>
> We have implemented create and rename recovery using client transaction IDs
> over the DFS endpoint ([HADOOP-19450] [ABFS] Rename/Create path idempotency
> client-level resolution - ASF JIRA). Since the backend changes were not fully
> rolled out, we initially implemented the changes with the flag disabled. With
> this update, we aim to enable the flag, which will start sending client
> transaction IDs. In case of a failure, we will attempt to recover from the
> failed state if possible. Here are the detailed steps and considerations for
> this process:
> 1. **Implementation Overview**: We introduced a mechanism for create and
> rename recovery via client transaction IDs to enhance reliability and data
> integrity over the DFS endpoint. This change was initially flagged as
> disabled due to incomplete backend rollouts.
> 2. **Current Update**: With the backend changes now in place, we are ready to
> enable the flag. This will activate the sending of client transaction IDs,
> allowing us to track and manage transactions more effectively.
> 3. **Failure Recovery**: The primary advantage of enabling this flag is the
> potential for recovery from failed states. If a transaction fails, we can use
> the client transaction ID to attempt a recovery, minimizing data loss and
> ensuring continuity.
> 4. **Next Steps**: We will proceed with enabling the flag and closely monitor
> the system's performance. Any issues or failures will be documented and
> addressed promptly to ensure a smooth transition.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]