This is an automated email from the ASF dual-hosted git repository. stevel pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.3 by this push: new f324efd2472 HADOOP-18012. ABFS: Enable config controlled ETag check for Rename idempotency (#5488) f324efd2472 is described below commit f324efd2472006f20426c95c8e5d694072d6e3dd Author: sreeb-msft <111426823+sreeb-m...@users.noreply.github.com> AuthorDate: Fri Mar 31 23:45:15 2023 +0530 HADOOP-18012. ABFS: Enable config controlled ETag check for Rename idempotency (#5488) To support recovery of network failures during rename, the abfs client fetches the etag of the source file, and when recovering from a failure, uses this tag to determine whether the rename succeeded before the failure happened. * This works for files, but not directories * It adds the overhead of a HEAD request before each rename. * The option can be disabled by setting "fs.azure.enable.rename.resilience" to false Contributed by Sree Bhattacharyya --- .../hadoop/fs/azurebfs/AbfsConfiguration.java | 11 + .../hadoop/fs/azurebfs/AzureBlobFileSystem.java | 6 +- .../fs/azurebfs/AzureBlobFileSystemStore.java | 4 +- .../fs/azurebfs/constants/ConfigurationKeys.java | 3 + .../constants/FileSystemConfigurations.java | 1 + .../hadoop/fs/azurebfs/services/AbfsClient.java | 201 ++++++---- .../azurebfs/services/AbfsClientRenameResult.java | 12 + .../fs/azurebfs/services/AbfsRestOperation.java | 53 ++- .../ITestAzureBlobFileSystemDelegationSAS.java | 8 +- .../fs/azurebfs/ITestCustomerProvidedKey.java | 6 +- .../services/TestAbfsRenameRetryRecovery.java | 414 ++++++++++++++++++++- 11 files changed, 622 insertions(+), 97 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index 69298079a57..1bf7c569da1 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -333,6 +333,10 @@ public class AbfsConfiguration{ FS_AZURE_ENABLE_ABFS_LIST_ITERATOR, DefaultValue = DEFAULT_ENABLE_ABFS_LIST_ITERATOR) private boolean enableAbfsListIterator; + @BooleanConfigurationValidatorAnnotation(ConfigurationKey = + FS_AZURE_ABFS_RENAME_RESILIENCE, DefaultValue = DEFAULT_ENABLE_ABFS_RENAME_RESILIENCE) + private boolean renameResilience; + public AbfsConfiguration(final Configuration rawConfig, String accountName) throws IllegalAccessException, InvalidConfigurationValueException, IOException { this.rawConfig = ProviderUtils.excludeIncompatibleCredentialProviders( @@ -1139,4 +1143,11 @@ public class AbfsConfiguration{ this.enableAbfsListIterator = enableAbfsListIterator; } + public boolean getRenameResilience() { + return renameResilience; + } + + void setRenameResilience(boolean actualResilience) { + renameResilience = actualResilience; + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java index 5534b5fb44a..9c9d6f561d7 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java @@ -201,9 +201,9 @@ public class AzureBlobFileSystem extends FileSystem tracingHeaderFormat = abfsConfiguration.getTracingHeaderFormat(); this.setWorkingDirectory(this.getHomeDirectory()); + TracingContext tracingContext = new TracingContext(clientCorrelationId, + fileSystemId, FSOperationType.CREATE_FILESYSTEM, tracingHeaderFormat, listener); if (abfsConfiguration.getCreateRemoteFileSystemDuringInitialization()) { - TracingContext tracingContext = new TracingContext(clientCorrelationId, - fileSystemId, FSOperationType.CREATE_FILESYSTEM, tracingHeaderFormat, listener); if (this.tryGetFileStatus(new Path(AbfsHttpConstants.ROOT_PATH), tracingContext) == null) { try { this.createFileSystem(tracingContext); @@ -442,7 +442,7 @@ public class AzureBlobFileSystem extends FileSystem } // Non-HNS account need to check dst status on driver side. - if (!abfsStore.getIsNamespaceEnabled(tracingContext) && dstFileStatus == null) { + if (!getIsNamespaceEnabled(tracingContext) && dstFileStatus == null) { dstFileStatus = tryGetFileStatus(qualifiedDstPath, tracingContext); } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index 70ba9cd579a..cd33da401c9 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -923,9 +923,11 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport { do { try (AbfsPerfInfo perfInfo = startTracking("rename", "renamePath")) { + boolean isNamespaceEnabled = getIsNamespaceEnabled(tracingContext); final AbfsClientRenameResult abfsClientRenameResult = client.renamePath(sourceRelativePath, destinationRelativePath, - continuation, tracingContext, sourceEtag, false); + continuation, tracingContext, sourceEtag, false, + isNamespaceEnabled); AbfsRestOperation op = abfsClientRenameResult.getOp(); perfInfo.registerResult(op.getResult()); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java index e3052cd7bbc..872364a8e61 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java @@ -238,6 +238,9 @@ public final class ConfigurationKeys { /** Key for rate limit capacity, as used by IO operations which try to throttle themselves. */ public static final String FS_AZURE_ABFS_IO_RATE_LIMIT = "fs.azure.io.rate.limit"; + /** Add extra resilience to rename failures, at the expense of performance. */ + public static final String FS_AZURE_ABFS_RENAME_RESILIENCE = "fs.azure.enable.rename.resilience"; + public static String accountProperty(String property, String account) { return property + "." + account; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java index 68b492a5791..32f9966e30a 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java @@ -118,6 +118,7 @@ public final class FileSystemConfigurations { public static final int STREAM_ID_LEN = 12; public static final boolean DEFAULT_ENABLE_ABFS_LIST_ITERATOR = true; + public static final boolean DEFAULT_ENABLE_ABFS_RENAME_RESILIENCE = true; /** * Limit of queued block upload operations before writes diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java index 2e497b54975..1767274f360 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java @@ -55,6 +55,7 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants; import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; import org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.SASTokenProviderException; @@ -68,6 +69,7 @@ import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory; import org.apache.hadoop.util.concurrent.HadoopExecutors; +import static org.apache.commons.lang3.StringUtils.isEmpty; import static org.apache.commons.lang3.StringUtils.isNotEmpty; import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.RENAME_PATH_ATTEMPTS; import static org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.extractEtagHeader; @@ -77,8 +79,8 @@ import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.S import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.HTTPS_SCHEME; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.*; import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.*; -import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.RENAME_DESTINATION_PARENT_PATH_NOT_FOUND; +import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND; /** * AbfsClient. @@ -106,9 +108,12 @@ public class AbfsClient implements Closeable { private final ListeningScheduledExecutorService executorService; - /** logging the rename failure if metadata is in an incomplete state. */ - private static final LogExactlyOnce ABFS_METADATA_INCOMPLETE_RENAME_FAILURE = - new LogExactlyOnce(LOG); + private boolean renameResilience; + + /** + * logging the rename failure if metadata is in an incomplete state. + */ + private static final LogExactlyOnce ABFS_METADATA_INCOMPLETE_RENAME_FAILURE = new LogExactlyOnce(LOG); private AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials, final AbfsConfiguration abfsConfiguration, @@ -123,6 +128,7 @@ public class AbfsClient implements Closeable { this.accountName = abfsConfiguration.getAccountName().substring(0, abfsConfiguration.getAccountName().indexOf(AbfsHttpConstants.DOT)); this.authType = abfsConfiguration.getAuthType(accountName); this.intercept = AbfsThrottlingInterceptFactory.getInstance(accountName, abfsConfiguration); + this.renameResilience = abfsConfiguration.getRenameResilience(); String encryptionKey = this.abfsConfiguration .getClientProvidedEncryptionKey(); @@ -504,27 +510,55 @@ public class AbfsClient implements Closeable { * took place. * As rename recovery is only attempted if the source etag is non-empty, * in normal rename operations rename recovery will never happen. - * @param source path to source file - * @param destination destination of rename. - * @param continuation continuation. - * @param tracingContext trace context - * @param sourceEtag etag of source file. may be null or empty + * + * @param source path to source file + * @param destination destination of rename. + * @param continuation continuation. + * @param tracingContext trace context + * @param sourceEtag etag of source file. may be null or empty * @param isMetadataIncompleteState was there a rename failure due to * incomplete metadata state? + * @param isNamespaceEnabled whether namespace enabled account or not * @return AbfsClientRenameResult result of rename operation indicating the * AbfsRest operation, rename recovery and incomplete metadata state failure. * @throws AzureBlobFileSystemException failure, excluding any recovery from overload failures. */ public AbfsClientRenameResult renamePath( - final String source, - final String destination, - final String continuation, - final TracingContext tracingContext, - final String sourceEtag, - boolean isMetadataIncompleteState) + final String source, + final String destination, + final String continuation, + final TracingContext tracingContext, + String sourceEtag, + boolean isMetadataIncompleteState, + boolean isNamespaceEnabled) throws AzureBlobFileSystemException { final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); + final boolean hasEtag = !isEmpty(sourceEtag); + + boolean shouldAttemptRecovery = renameResilience && isNamespaceEnabled; + if (!hasEtag && shouldAttemptRecovery) { + // in case eTag is already not supplied to the API + // and rename resilience is expected and it is an HNS enabled account + // fetch the source etag to be used later in recovery + try { + final AbfsRestOperation srcStatusOp = getPathStatus(source, + false, tracingContext); + if (srcStatusOp.hasResult()) { + final AbfsHttpOperation result = srcStatusOp.getResult(); + sourceEtag = extractEtagHeader(result); + // and update the directory status. + boolean isDir = checkIsDir(result); + shouldAttemptRecovery = !isDir; + LOG.debug("Retrieved etag of source for rename recovery: {}; isDir={}", sourceEtag, isDir); + } + } catch (AbfsRestOperationException e) { + throw new AbfsRestOperationException(e.getStatusCode(), SOURCE_PATH_NOT_FOUND.getErrorCode(), + e.getMessage(), e); + } + + } + String encodedRenameSource = urlEncode(FORWARD_SLASH + this.getFileSystem() + source); if (authType == AuthType.SAS) { final AbfsUriQueryBuilder srcQueryBuilder = new AbfsUriQueryBuilder(); @@ -541,12 +575,7 @@ public class AbfsClient implements Closeable { appendSASTokenToQuery(destination, SASTokenProvider.RENAME_DESTINATION_OPERATION, abfsUriQueryBuilder); final URL url = createRequestUrl(destination, abfsUriQueryBuilder.toString()); - final AbfsRestOperation op = new AbfsRestOperation( - AbfsRestOperationType.RenamePath, - this, - HTTP_METHOD_PUT, - url, - requestHeaders); + final AbfsRestOperation op = createRenameRestOperation(url, requestHeaders); try { incrementAbfsRenamePath(); op.execute(tracingContext); @@ -557,48 +586,74 @@ public class AbfsClient implements Closeable { // isMetadataIncompleteState is used for renameRecovery(as the 2nd param). return new AbfsClientRenameResult(op, isMetadataIncompleteState, isMetadataIncompleteState); } catch (AzureBlobFileSystemException e) { - // If we have no HTTP response, throw the original exception. - if (!op.hasResult()) { - throw e; - } - - // ref: HADOOP-18242. Rename failure occurring due to a rare case of - // tracking metadata being in incomplete state. - if (op.getResult().getStorageErrorCode() - .equals(RENAME_DESTINATION_PARENT_PATH_NOT_FOUND.getErrorCode()) - && !isMetadataIncompleteState) { - // Logging once - ABFS_METADATA_INCOMPLETE_RENAME_FAILURE - .info("Rename Failure attempting to resolve tracking metadata state and retrying."); + // If we have no HTTP response, throw the original exception. + if (!op.hasResult()) { + throw e; + } + // ref: HADOOP-18242. Rename failure occurring due to a rare case of + // tracking metadata being in incomplete state. + if (op.getResult().getStorageErrorCode() + .equals(RENAME_DESTINATION_PARENT_PATH_NOT_FOUND.getErrorCode()) + && !isMetadataIncompleteState) { + //Logging + ABFS_METADATA_INCOMPLETE_RENAME_FAILURE + .info("Rename Failure attempting to resolve tracking metadata state and retrying."); + // rename recovery should be attempted in this case also + shouldAttemptRecovery = true; + isMetadataIncompleteState = true; + String sourceEtagAfterFailure = sourceEtag; + if (isEmpty(sourceEtagAfterFailure)) { // Doing a HEAD call resolves the incomplete metadata state and // then we can retry the rename operation. AbfsRestOperation sourceStatusOp = getPathStatus(source, false, tracingContext); - isMetadataIncompleteState = true; // Extract the sourceEtag, using the status Op, and set it // for future rename recovery. AbfsHttpOperation sourceStatusResult = sourceStatusOp.getResult(); - String sourceEtagAfterFailure = extractEtagHeader(sourceStatusResult); - renamePath(source, destination, continuation, tracingContext, - sourceEtagAfterFailure, isMetadataIncompleteState); - } - // if we get out of the condition without a successful rename, then - // it isn't metadata incomplete state issue. - isMetadataIncompleteState = false; - - boolean etagCheckSucceeded = renameIdempotencyCheckOp( - source, - sourceEtag, op, destination, tracingContext); - if (!etagCheckSucceeded) { - // idempotency did not return different result - // throw back the exception - throw e; + sourceEtagAfterFailure = extractEtagHeader(sourceStatusResult); } + renamePath(source, destination, continuation, tracingContext, + sourceEtagAfterFailure, isMetadataIncompleteState, isNamespaceEnabled); + } + // if we get out of the condition without a successful rename, then + // it isn't metadata incomplete state issue. + isMetadataIncompleteState = false; + + // setting default rename recovery success to false + boolean etagCheckSucceeded = false; + if (shouldAttemptRecovery) { + etagCheckSucceeded = renameIdempotencyCheckOp( + source, + sourceEtag, op, destination, tracingContext); + } + if (!etagCheckSucceeded) { + // idempotency did not return different result + // throw back the exception + throw e; + } return new AbfsClientRenameResult(op, true, isMetadataIncompleteState); } } + private boolean checkIsDir(AbfsHttpOperation result) { + String resourceType = result.getResponseHeader( + HttpHeaderConfigurations.X_MS_RESOURCE_TYPE); + return resourceType != null + && resourceType.equalsIgnoreCase(AbfsHttpConstants.DIRECTORY); + } + + @VisibleForTesting + AbfsRestOperation createRenameRestOperation(URL url, List<AbfsHttpHeader> requestHeaders) { + AbfsRestOperation op = new AbfsRestOperation( + AbfsRestOperationType.RenamePath, + this, + HTTP_METHOD_PUT, + url, + requestHeaders); + return op; + } + private void incrementAbfsRenamePath() { abfsCounters.incrementCounter(RENAME_PATH_ATTEMPTS, 1); } @@ -628,28 +683,44 @@ public class AbfsClient implements Closeable { TracingContext tracingContext) { Preconditions.checkArgument(op.hasResult(), "Operations has null HTTP response"); - if ((op.isARetriedRequest()) - && (op.getResult().getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) - && isNotEmpty(sourceEtag)) { - - // Server has returned HTTP 404, which means rename source no longer - // exists. Check on destination status and if its etag matches - // that of the source, consider it to be a success. - LOG.debug("rename {} to {} failed, checking etag of destination", - source, destination); + // removing isDir from debug logs as it can be misleading + LOG.debug("rename({}, {}) failure {}; retry={} etag {}", + source, destination, op.getResult().getStatusCode(), op.isARetriedRequest(), sourceEtag); + if (!(op.isARetriedRequest() + && (op.getResult().getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND))) { + // only attempt recovery if the failure was a 404 on a retried rename request. + return false; + } + if (isNotEmpty(sourceEtag)) { + // Server has returned HTTP 404, we have an etag, so see + // if the rename has actually taken place, + LOG.info("rename {} to {} failed, checking etag of destination", + source, destination); try { - final AbfsRestOperation destStatusOp = getPathStatus(destination, - false, tracingContext); + final AbfsRestOperation destStatusOp = getPathStatus(destination, false, tracingContext); final AbfsHttpOperation result = destStatusOp.getResult(); - return result.getStatusCode() == HttpURLConnection.HTTP_OK - && sourceEtag.equals(extractEtagHeader(result)); - } catch (AzureBlobFileSystemException ignored) { + final boolean recovered = result.getStatusCode() == HttpURLConnection.HTTP_OK + && sourceEtag.equals(extractEtagHeader(result)); + LOG.info("File rename has taken place: recovery {}", + recovered ? "succeeded" : "failed"); + return recovered; + + } catch (AzureBlobFileSystemException ex) { // GetFileStatus on the destination failed, the rename did not take place + // or some other failure. log and swallow. + LOG.debug("Failed to get status of path {}", destination, ex); } + } else { + LOG.debug("No source etag; unable to probe for the operation's success"); } - return false; + return false; + } + + @VisibleForTesting + boolean isSourceDestEtagEqual(String sourceEtag, AbfsHttpOperation result) { + return sourceEtag.equals(extractEtagHeader(result)); } public AbfsRestOperation append(final String path, final byte[] buffer, diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientRenameResult.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientRenameResult.java index 86e3473a9fe..76648cfc44b 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientRenameResult.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientRenameResult.java @@ -58,4 +58,16 @@ public class AbfsClientRenameResult { public boolean isIncompleteMetadataState() { return isIncompleteMetadataState; } + + @Override + public String toString() { + return "AbfsClientRenameResult{" + + "op=" + + op + + ", renameRecovered=" + + renameRecovered + + ", isIncompleteMetadataState=" + + isIncompleteMetadataState + + '}'; + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java index 362e0d1213f..df58df437c1 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java @@ -277,26 +277,8 @@ public class AbfsRestOperation { incrementCounter(AbfsStatistic.CONNECTIONS_MADE, 1); tracingContext.constructHeader(httpOperation, failureReason); - switch(client.getAuthType()) { - case Custom: - case OAuth: - LOG.debug("Authenticating request with OAuth2 access token"); - httpOperation.setRequestProperty(HttpHeaderConfigurations.AUTHORIZATION, - client.getAccessToken()); - break; - case SAS: - // do nothing; the SAS token should already be appended to the query string - httpOperation.setMaskForSAS(); //mask sig/oid from url for logs - break; - case SharedKey: - // sign the HTTP request - LOG.debug("Signing request with shared key"); - // sign the HTTP request - client.getSharedKeyCredentials().signRequest( - httpOperation.getConnection(), - hasRequestBody ? bufferLength : 0); - break; - } + signRequest(httpOperation, hasRequestBody ? bufferLength : 0); + } catch (IOException e) { LOG.debug("Auth failure: {}, {}", method, url); throw new AbfsRestOperationException(-1, null, @@ -377,6 +359,37 @@ public class AbfsRestOperation { return true; } + /** + * Sign an operation. + * @param httpOperation operation to sign + * @param bytesToSign how many bytes to sign for shared key auth. + * @throws IOException failure + */ + @VisibleForTesting + public void signRequest(final AbfsHttpOperation httpOperation, int bytesToSign) throws IOException { + switch(client.getAuthType()) { + case Custom: + case OAuth: + LOG.debug("Authenticating request with OAuth2 access token"); + httpOperation.getConnection().setRequestProperty(HttpHeaderConfigurations.AUTHORIZATION, + client.getAccessToken()); + break; + case SAS: + // do nothing; the SAS token should already be appended to the query string + httpOperation.setMaskForSAS(); //mask sig/oid from url for logs + break; + case SharedKey: + default: + // sign the HTTP request + LOG.debug("Signing request with shared key"); + // sign the HTTP request + client.getSharedKeyCredentials().signRequest( + httpOperation.getConnection(), + bytesToSign); + break; + } + } + /** * Creates new object of {@link AbfsHttpOperation} with the url, method, and * requestHeaders fields of the AbfsRestOperation object. diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelegationSAS.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelegationSAS.java index b164689ef80..5735423aaf9 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelegationSAS.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelegationSAS.java @@ -70,6 +70,8 @@ public class ITestAzureBlobFileSystemDelegationSAS extends AbstractAbfsIntegrati private static final Logger LOG = LoggerFactory.getLogger(ITestAzureBlobFileSystemDelegationSAS.class); + private boolean isHNSEnabled; + public ITestAzureBlobFileSystemDelegationSAS() throws Exception { // These tests rely on specific settings in azure-auth-keys.xml: String sasProvider = getRawConfiguration().get(FS_AZURE_SAS_TOKEN_PROVIDER_TYPE); @@ -85,7 +87,7 @@ public class ITestAzureBlobFileSystemDelegationSAS extends AbstractAbfsIntegrati @Override public void setup() throws Exception { - boolean isHNSEnabled = this.getConfiguration().getBoolean( + isHNSEnabled = this.getConfiguration().getBoolean( TestConfigurationKeys.FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT, false); Assume.assumeTrue(isHNSEnabled); createFilesystemForSASTests(); @@ -401,7 +403,7 @@ public class ITestAzureBlobFileSystemDelegationSAS extends AbstractAbfsIntegrati fs.create(new Path(src)).close(); AbfsRestOperation abfsHttpRestOperation = fs.getAbfsClient() .renamePath(src, "/testABC" + "/abc.txt", null, - getTestTracingContext(fs, false), null, false) + getTestTracingContext(fs, false), null, false, isHNSEnabled) .getOp(); AbfsHttpOperation result = abfsHttpRestOperation.getResult(); String url = result.getMaskedUrl(); @@ -419,7 +421,7 @@ public class ITestAzureBlobFileSystemDelegationSAS extends AbstractAbfsIntegrati intercept(IOException.class, "sig=XXXX", () -> getFileSystem().getAbfsClient() .renamePath("testABC/test.xt", "testABC/abc.txt", null, - getTestTracingContext(getFileSystem(), false), null, false)); + getTestTracingContext(getFileSystem(), false), null, false, isHNSEnabled)); } @Test diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestCustomerProvidedKey.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestCustomerProvidedKey.java index 6496c9234f3..c2d7f80d373 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestCustomerProvidedKey.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestCustomerProvidedKey.java @@ -99,10 +99,14 @@ public class ITestCustomerProvidedKey extends AbstractAbfsIntegrationTest { private static final int FILE_SIZE = 10 * ONE_MB; private static final int FILE_SIZE_FOR_COPY_BETWEEN_ACCOUNTS = 24 * ONE_MB; + private boolean isNamespaceEnabled; + public ITestCustomerProvidedKey() throws Exception { boolean isCPKTestsEnabled = getConfiguration() .getBoolean(FS_AZURE_TEST_CPK_ENABLED, false); Assume.assumeTrue(isCPKTestsEnabled); + isNamespaceEnabled = getConfiguration() + .getBoolean(FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT, false); } @Test @@ -526,7 +530,7 @@ public class ITestCustomerProvidedKey extends AbstractAbfsIntegrationTest { AbfsClient abfsClient = fs.getAbfsClient(); AbfsRestOperation abfsRestOperation = abfsClient .renamePath(testFileName, newName, null, - getTestTracingContext(fs, false), null, false) + getTestTracingContext(fs, false), null, false, isNamespaceEnabled) .getOp(); assertCPKHeaders(abfsRestOperation, false); assertNoCPKResponseHeadersPresent(abfsRestOperation); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRenameRetryRecovery.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRenameRetryRecovery.java index f5cbceaddd8..cef1c9ae5a1 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRenameRetryRecovery.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRenameRetryRecovery.java @@ -18,19 +18,44 @@ package org.apache.hadoop.fs.azurebfs.services; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.SocketException; +import java.net.URL; +import java.time.Duration; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys; +import org.apache.hadoop.fs.statistics.IOStatistics; import org.assertj.core.api.Assertions; +import org.junit.Assume; import org.junit.Test; +import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.EtagSource; import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest; import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; +import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore; +import org.apache.hadoop.fs.azurebfs.commit.ResilientCommitByRename; +import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; +import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode; +import org.apache.hadoop.fs.azurebfs.utils.TracingContext; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT; +import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND; +import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.PATH_ALREADY_EXISTS; import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.RENAME_DESTINATION_PARENT_PATH_NOT_FOUND; +import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CONNECTIONS_MADE; +import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.RENAME_PATH_ATTEMPTS; +import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter; +import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.lookupCounterStatistic; import static org.apache.hadoop.test.LambdaTestUtils.intercept; +import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -45,7 +70,11 @@ public class TestAbfsRenameRetryRecovery extends AbstractAbfsIntegrationTest { private static final Logger LOG = LoggerFactory.getLogger(TestAbfsRenameRetryRecovery.class); + private boolean isNamespaceEnabled; + public TestAbfsRenameRetryRecovery() throws Exception { + isNamespaceEnabled = getConfiguration() + .getBoolean(TestConfigurationKeys.FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT, false); } /** @@ -90,7 +119,7 @@ public class TestAbfsRenameRetryRecovery extends AbstractAbfsIntegrationTest { // We need to throw an exception once a rename is triggered with // destination having no parent, but after a retry it needs to succeed. when(mockClient.renamePath(sourcePath, destNoParentPath, null, null, - null, false)) + null, false, isNamespaceEnabled)) .thenThrow(destParentNotFound) .thenReturn(recoveredMetaDataIncompleteResult); @@ -98,12 +127,12 @@ public class TestAbfsRenameRetryRecovery extends AbstractAbfsIntegrationTest { intercept(AzureBlobFileSystemException.class, () -> mockClient.renamePath(sourcePath, destNoParentPath, null, null, - null, false)); + null, false, isNamespaceEnabled)); AbfsClientRenameResult resultOfSecondRenameCall = mockClient.renamePath(sourcePath, destNoParentPath, null, null, - null, false); + null, false, isNamespaceEnabled); // the second rename call should be the recoveredResult due to // metaDataIncomplete @@ -119,8 +148,385 @@ public class TestAbfsRenameRetryRecovery extends AbstractAbfsIntegrationTest { // Verify renamePath occurred two times implying a retry was attempted. verify(mockClient, times(2)) - .renamePath(sourcePath, destNoParentPath, null, null, null, false); + .renamePath(sourcePath, destNoParentPath, null, null, null, false, + isNamespaceEnabled); + + } + + AbfsClient getMockAbfsClient() throws IOException { + AzureBlobFileSystem fs = getFileSystem(); + + // adding mock objects to current AbfsClient + AbfsClient spyClient = Mockito.spy(fs.getAbfsStore().getClient()); + + Mockito.doAnswer(answer -> { + AbfsRestOperation op = new AbfsRestOperation(AbfsRestOperationType.RenamePath, + spyClient, HTTP_METHOD_PUT, answer.getArgument(0), answer.getArgument(1)); + AbfsRestOperation spiedOp = Mockito.spy(op); + addSpyBehavior(spiedOp, op, spyClient); + return spiedOp; + }).when(spyClient).createRenameRestOperation(Mockito.any(URL.class), anyList()); + + return spyClient; + + } + + /** + * Spies on a rest operation to inject transient failure. + * the first createHttpOperation() invocation will return an abfs rest operation + * which will fail. + * @param spiedRestOp spied operation whose createHttpOperation() will fail first time + * @param normalRestOp normal operation the good operation + * @param client client. + * @throws IOException failure + */ + private void addSpyBehavior(final AbfsRestOperation spiedRestOp, + final AbfsRestOperation normalRestOp, + final AbfsClient client) + throws IOException { + AbfsHttpOperation failingOperation = Mockito.spy(normalRestOp.createHttpOperation()); + AbfsHttpOperation normalOp1 = normalRestOp.createHttpOperation(); + executeThenFail(client, normalRestOp, failingOperation, normalOp1); + AbfsHttpOperation normalOp2 = normalRestOp.createHttpOperation(); + normalOp2.getConnection().setRequestProperty(HttpHeaderConfigurations.AUTHORIZATION, + client.getAccessToken()); + + when(spiedRestOp.createHttpOperation()) + .thenReturn(failingOperation) + .thenReturn(normalOp2); + } + + /** + * Mock an idempotency failure by executing the normal operation, then + * raising an IOE. + * @param normalRestOp the rest operation used to sign the requests. + * @param failingOperation failing operation + * @param normalOp good operation + * @throws IOException failure + */ + private void executeThenFail(final AbfsClient client, + final AbfsRestOperation normalRestOp, + final AbfsHttpOperation failingOperation, + final AbfsHttpOperation normalOp) + throws IOException { + + Mockito.doAnswer(answer -> { + LOG.info("Executing first attempt with post-operation fault injection"); + final byte[] buffer = answer.getArgument(0); + final int offset = answer.getArgument(1); + final int length = answer.getArgument(2); + normalRestOp.signRequest(normalOp, length); + normalOp.sendRequest(buffer, offset, length); + normalOp.processResponse(buffer, offset, length); + LOG.info("Actual outcome is {} \"{}\" \"{}\"; injecting failure", + normalOp.getStatusCode(), + normalOp.getStorageErrorCode(), + normalOp.getStorageErrorMessage()); + throw new SocketException("connection-reset"); + }).when(failingOperation).sendRequest(Mockito.nullable(byte[].class), + Mockito.nullable(int.class), Mockito.nullable(int.class)); + + } + + /** + * This is the good outcome: resilient rename. + */ + @Test + public void testRenameRecoveryEtagMatchFsLevel() throws IOException { + AzureBlobFileSystem fs = getFileSystem(); + AzureBlobFileSystemStore abfsStore = fs.getAbfsStore(); + TracingContext testTracingContext = getTestTracingContext(fs, false); + + Assume.assumeTrue(fs.getAbfsStore().getIsNamespaceEnabled(testTracingContext)); + + AbfsClient mockClient = getMockAbfsClient(); + + String base = "/" + getMethodName(); + String path1 = base + "/dummyFile1"; + String path2 = base + "/dummyFile2"; + + touch(new Path(path1)); + + setAbfsClient(abfsStore, mockClient); + + // checking correct count in AbfsCounters + AbfsCounters counter = mockClient.getAbfsCounters(); + IOStatistics ioStats = counter.getIOStatistics(); + + Long connMadeBeforeRename = lookupCounterStatistic(ioStats, CONNECTIONS_MADE.getStatName()); + Long renamePathAttemptsBeforeRename = lookupCounterStatistic(ioStats, RENAME_PATH_ATTEMPTS.getStatName()); + + // 404 and retry, send sourceEtag as null + // source eTag matches -> rename should pass even when execute throws exception + fs.rename(new Path(path1), new Path(path2)); + + // validating stat counters after rename + // 4 calls should have happened in total for rename + // 1 -> original rename rest call, 2 -> first retry, + // +2 for getPathStatus calls + assertThatStatisticCounter(ioStats, + CONNECTIONS_MADE.getStatName()) + .isEqualTo(4 + connMadeBeforeRename); + // the RENAME_PATH_ATTEMPTS stat should be incremented by 1 + // retries happen internally within AbfsRestOperation execute() + // the stat for RENAME_PATH_ATTEMPTS is updated only once before execute() is called + assertThatStatisticCounter(ioStats, + RENAME_PATH_ATTEMPTS.getStatName()) + .isEqualTo(1 + renamePathAttemptsBeforeRename); + + } + + /** + * execute a failing rename but have the file at the far end not match. + * This is done by explicitly passing in a made up etag for the source + * etag and creating a file at the far end. + * The first rename will actually fail with a path exists exception, + * but as that is swallowed, it's not a problem. + */ + @Test + public void testRenameRecoveryEtagMismatchFsLevel() throws Exception { + AzureBlobFileSystem fs = getFileSystem(); + AzureBlobFileSystemStore abfsStore = fs.getAbfsStore(); + TracingContext testTracingContext = getTestTracingContext(fs, false); + + Assume.assumeTrue(fs.getAbfsStore().getIsNamespaceEnabled(testTracingContext)); + + AbfsClient mockClient = getMockAbfsClient(); + + String base = "/" + getMethodName(); + String path1 = base + "/dummyFile1"; + String path2 = base + "/dummyFile2"; + + fs.create(new Path(path2)); + + setAbfsClient(abfsStore, mockClient); + + // source eTag does not match -> rename should be a failure + assertEquals(false, fs.rename(new Path(path1), new Path(path2))); + + } + + @Test + public void testRenameRecoveryFailsForDirFsLevel() throws Exception { + AzureBlobFileSystem fs = getFileSystem(); + AzureBlobFileSystemStore abfsStore = fs.getAbfsStore(); + TracingContext testTracingContext = getTestTracingContext(fs, false); + + Assume.assumeTrue(fs.getAbfsStore().getIsNamespaceEnabled(testTracingContext)); + + AbfsClient mockClient = getMockAbfsClient(); + + String dir1 = "/dummyDir1"; + String dir2 = "/dummyDir2"; + + Path path1 = new Path(dir1); + Path path2 = new Path(dir2); + + fs.mkdirs(path1); + + setAbfsClient(abfsStore, mockClient); + + // checking correct count in AbfsCounters + AbfsCounters counter = mockClient.getAbfsCounters(); + IOStatistics ioStats = counter.getIOStatistics(); + + Long connMadeBeforeRename = lookupCounterStatistic(ioStats, CONNECTIONS_MADE.getStatName()); + Long renamePathAttemptsBeforeRename = lookupCounterStatistic(ioStats, RENAME_PATH_ATTEMPTS.getStatName()); + + // source eTag does not match -> rename should be a failure + boolean renameResult = fs.rename(path1, path2); + assertEquals(false, renameResult); + + // validating stat counters after rename + // 3 calls should have happened in total for rename + // 1 -> original rename rest call, 2 -> first retry, + // +1 for getPathStatus calls + // last getPathStatus call should be skipped + assertThatStatisticCounter(ioStats, + CONNECTIONS_MADE.getStatName()) + .isEqualTo(3 + connMadeBeforeRename); + + // the RENAME_PATH_ATTEMPTS stat should be incremented by 1 + // retries happen internally within AbfsRestOperation execute() + // the stat for RENAME_PATH_ATTEMPTS is updated only once before execute() is called + assertThatStatisticCounter(ioStats, + RENAME_PATH_ATTEMPTS.getStatName()) + .isEqualTo(1 + renamePathAttemptsBeforeRename); + } + + /** + * Assert that an exception failed with a specific error code. + * @param code code + * @param e exception + * @throws AbfsRestOperationException if there is a mismatch + */ + private static void expectErrorCode(final AzureServiceErrorCode code, + final AbfsRestOperationException e) throws AbfsRestOperationException { + if (e.getErrorCode() != code) { + throw e; + } + } + + /** + * Directory rename failure is unrecoverable. + */ + @Test + public void testDirRenameRecoveryUnsupported() throws Exception { + AzureBlobFileSystem fs = getFileSystem(); + TracingContext testTracingContext = getTestTracingContext(fs, false); + + Assume.assumeTrue(fs.getAbfsStore().getIsNamespaceEnabled(testTracingContext)); + + AbfsClient spyClient = getMockAbfsClient(); + + String base = "/" + getMethodName(); + String path1 = base + "/dummyDir1"; + String path2 = base + "/dummyDir2"; + + fs.mkdirs(new Path(path1)); + + // source eTag does not match -> throw exception + expectErrorCode(SOURCE_PATH_NOT_FOUND, intercept(AbfsRestOperationException.class, () -> + spyClient.renamePath(path1, path2, null, testTracingContext, null, false, + isNamespaceEnabled))); + } + + /** + * Even with failures, having + */ + @Test + public void testExistingPathCorrectlyRejected() throws Exception { + AzureBlobFileSystem fs = getFileSystem(); + TracingContext testTracingContext = getTestTracingContext(fs, false); + + Assume.assumeTrue(fs.getAbfsStore().getIsNamespaceEnabled(testTracingContext)); + + AbfsClient spyClient = getMockAbfsClient(); + + String base = "/" + getMethodName(); + String path1 = base + "/dummyDir1"; + String path2 = base + "/dummyDir2"; + + + touch(new Path(path1)); + touch(new Path(path2)); + + // source eTag does not match -> throw exception + expectErrorCode(PATH_ALREADY_EXISTS, intercept(AbfsRestOperationException.class, () -> + spyClient.renamePath(path1, path2, null, testTracingContext, null, false, + isNamespaceEnabled))); + } + + /** + * Test that rename recovery remains unsupported for + * FNS configurations. + */ + @Test + public void testRenameRecoveryUnsupportedForFlatNamespace() throws Exception { + Assume.assumeTrue(!isNamespaceEnabled); + AzureBlobFileSystem fs = getFileSystem(); + AzureBlobFileSystemStore abfsStore = fs.getAbfsStore(); + TracingContext testTracingContext = getTestTracingContext(fs, false); + + AbfsClient mockClient = getMockAbfsClient(); + + String base = "/" + getMethodName(); + String path1 = base + "/dummyFile1"; + String path2 = base + "/dummyFile2"; + + touch(new Path(path1)); + + setAbfsClient(abfsStore, mockClient); + + // checking correct count in AbfsCounters + AbfsCounters counter = mockClient.getAbfsCounters(); + IOStatistics ioStats = counter.getIOStatistics(); + + Long connMadeBeforeRename = lookupCounterStatistic(ioStats, CONNECTIONS_MADE.getStatName()); + Long renamePathAttemptsBeforeRename = lookupCounterStatistic(ioStats, RENAME_PATH_ATTEMPTS.getStatName()); + + expectErrorCode(SOURCE_PATH_NOT_FOUND, intercept(AbfsRestOperationException.class, () -> + mockClient.renamePath(path1, path2, null, testTracingContext, null, false, + isNamespaceEnabled))); + + // validating stat counters after rename + + // only 2 calls should have happened in total for rename + // 1 -> original rename rest call, 2 -> first retry, + // no getPathStatus calls + // last getPathStatus call should be skipped + assertThatStatisticCounter(ioStats, + CONNECTIONS_MADE.getStatName()) + .isEqualTo(2 + connMadeBeforeRename); + + // the RENAME_PATH_ATTEMPTS stat should be incremented by 1 + // retries happen internally within AbfsRestOperation execute() + // the stat for RENAME_PATH_ATTEMPTS is updated only once before execute() is called + assertThatStatisticCounter(ioStats, + RENAME_PATH_ATTEMPTS.getStatName()) + .isEqualTo(1 + renamePathAttemptsBeforeRename); + } + + /** + * Test the resilient commit code works through fault injection, including + * reporting recovery. + */ + @Test + public void testResilientCommitOperation() throws Throwable { + AzureBlobFileSystem fs = getFileSystem(); + TracingContext testTracingContext = getTestTracingContext(fs, false); + + final AzureBlobFileSystemStore store = fs.getAbfsStore(); + Assume.assumeTrue(store.getIsNamespaceEnabled(testTracingContext)); + + // patch in the mock abfs client to the filesystem, for the resilient + // commit API to pick up. + setAbfsClient(store, getMockAbfsClient()); + + String base = "/" + getMethodName(); + String path1 = base + "/dummyDir1"; + String path2 = base + "/dummyDir2"; + + + final Path source = new Path(path1); + touch(source); + final String sourceTag = ((EtagSource) fs.getFileStatus(source)).getEtag(); + + final ResilientCommitByRename commit = fs.createResilientCommitSupport(source); + final Pair<Boolean, Duration> outcome = + commit.commitSingleFileByRename(source, new Path(path2), sourceTag); + Assertions.assertThat(outcome.getKey()) + .describedAs("recovery flag") + .isTrue(); + } + /** + * Test the resilient commit code works through fault injection, including + * reporting recovery. + */ + @Test + public void testResilientCommitOperationTagMismatch() throws Throwable { + AzureBlobFileSystem fs = getFileSystem(); + TracingContext testTracingContext = getTestTracingContext(fs, false); + + final AzureBlobFileSystemStore store = fs.getAbfsStore(); + Assume.assumeTrue(store.getIsNamespaceEnabled(testTracingContext)); + + // patch in the mock abfs client to the filesystem, for the resilient + // commit API to pick up. + setAbfsClient(store, getMockAbfsClient()); + + String base = "/" + getMethodName(); + String path1 = base + "/dummyDir1"; + String path2 = base + "/dummyDir2"; + + + final Path source = new Path(path1); + touch(source); + final String sourceTag = ((EtagSource) fs.getFileStatus(source)).getEtag(); + final ResilientCommitByRename commit = fs.createResilientCommitSupport(source); + intercept(FileNotFoundException.class, () -> + commit.commitSingleFileByRename(source, new Path(path2), "not the right tag")); } /** --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org