This is an automated email from the ASF dual-hosted git repository.

tmarquardt pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 791270a  HADOOP-16730: ABFS: Support for Shared Access Signatures 
(SAS). Contributed by Sneha Vijayarajan.
791270a is described below

commit 791270a2e5e31546ff5c1ef4fa8bad6852b906dc
Author: Sneha Vijayarajan <snvij...@microsoft.com>
AuthorDate: Thu Feb 27 17:00:15 2020 +0000

    HADOOP-16730: ABFS: Support for Shared Access Signatures (SAS). Contributed 
by Sneha Vijayarajan.
---
 .../hadoop/fs/azurebfs/AbfsConfiguration.java      |  70 ++--
 .../hadoop/fs/azurebfs/AzureBlobFileSystem.java    |  58 +--
 .../fs/azurebfs/AzureBlobFileSystemStore.java      |  19 +-
 .../fs/azurebfs/constants/ConfigurationKeys.java   |   3 +-
 .../exceptions/SASTokenProviderException.java}     |  23 +-
 .../fs/azurebfs/extensions/AbfsAuthorizer.java     |  57 ---
 .../fs/azurebfs/extensions/SASTokenProvider.java   |  74 ++++
 .../hadoop/fs/azurebfs/services/AbfsClient.java    |  96 ++++-
 .../fs/azurebfs/services/AbfsRestOperation.java    |  30 +-
 .../fs/azurebfs/services/AbfsUriQueryBuilder.java  |  17 +-
 .../hadoop/fs/azurebfs/services/AuthType.java      |   3 +-
 .../hadoop-azure/src/site/markdown/abfs.md         |   2 +-
 .../fs/azurebfs/AbstractAbfsIntegrationTest.java   |  60 ++-
 .../fs/azurebfs/ITestAbfsIdentityTransformer.java  |   1 -
 .../ITestAzureBlobFileSystemAuthorization.java     | 402 ++++++++++-----------
 .../azurebfs/constants/TestConfigurationKeys.java  |   3 +
 .../fs/azurebfs/extensions/MockAbfsAuthorizer.java |  87 -----
 .../extensions/MockErrorSASTokenProvider.java      |  63 ++++
 .../azurebfs/extensions/MockSASTokenProvider.java  |  85 +++++
 .../fs/azurebfs/services/TestAbfsClient.java       |   9 +-
 .../hadoop/fs/azurebfs/utils/SASGenerator.java     | 129 +++++++
 .../hadoop-azure/src/test/resources/azure-test.xml |   2 +-
 22 files changed, 799 insertions(+), 494 deletions(-)

diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
index 81e4191..779f524 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
@@ -20,10 +20,10 @@ package org.apache.hadoop.fs.azurebfs;
 
 import java.io.IOException;
 import java.lang.reflect.Field;
-import java.lang.reflect.InvocationTargetException;
 import java.util.Map;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -40,15 +40,15 @@ import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemExc
 import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.ConfigurationPropertyNotFoundException;
 import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationValueException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.KeyProviderException;
+import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.SASTokenProviderException;
 import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.TokenAccessProviderException;
 import 
org.apache.hadoop.fs.azurebfs.diagnostics.Base64StringConfigurationBasicValidator;
 import 
org.apache.hadoop.fs.azurebfs.diagnostics.BooleanConfigurationBasicValidator;
 import 
org.apache.hadoop.fs.azurebfs.diagnostics.IntegerConfigurationBasicValidator;
 import 
org.apache.hadoop.fs.azurebfs.diagnostics.LongConfigurationBasicValidator;
 import 
org.apache.hadoop.fs.azurebfs.diagnostics.StringConfigurationBasicValidator;
-import org.apache.hadoop.fs.azurebfs.extensions.AbfsAuthorizationException;
-import org.apache.hadoop.fs.azurebfs.extensions.AbfsAuthorizer;
 import org.apache.hadoop.fs.azurebfs.extensions.CustomTokenProviderAdaptee;
+import org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider;
 import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
 import org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider;
 import org.apache.hadoop.fs.azurebfs.oauth2.CustomTokenProviderAdapter;
@@ -170,9 +170,6 @@ public class AbfsConfiguration{
       DefaultValue = DEFAULT_ENABLE_DELEGATION_TOKEN)
   private boolean enableDelegationToken;
 
-  @StringConfigurationValidatorAnnotation(ConfigurationKey = 
ABFS_EXTERNAL_AUTHORIZATION_CLASS,
-      DefaultValue = "")
-  private String abfsExternalAuthorizationClass;
 
   @BooleanConfigurationValidatorAnnotation(ConfigurationKey = 
FS_AZURE_ALWAYS_USE_HTTPS,
           DefaultValue = DEFAULT_ENABLE_HTTPS)
@@ -218,6 +215,14 @@ public class AbfsConfiguration{
   }
 
   /**
+   * Gets the Azure Storage account name corresponding to this instance of 
configuration.
+   * @return the Azure Storage account name
+   */
+  public String getAccountName() {
+    return accountName;
+  }
+
+  /**
    * Appends an account name to a configuration key yielding the
    * account-specific form.
    * @param key Account-agnostic configuration key
@@ -436,7 +441,9 @@ public class AbfsConfiguration{
   }
 
   public boolean getCreateRemoteFileSystemDuringInitialization() {
-    return this.createRemoteFileSystemDuringInitialization;
+    // we do not support creating the filesystem when AuthType is SAS
+    return this.createRemoteFileSystemDuringInitialization
+        && this.getAuthType(this.accountName) != AuthType.SAS;
   }
 
   public boolean getSkipUserGroupMetadataDuringInitialization() {
@@ -578,35 +585,32 @@ public class AbfsConfiguration{
     }
   }
 
-  public String getAbfsExternalAuthorizationClass() {
-    return this.abfsExternalAuthorizationClass;
-  }
-
-  public AbfsAuthorizer getAbfsAuthorizer() throws IOException {
-    String authClassName = getAbfsExternalAuthorizationClass();
-    AbfsAuthorizer authorizer = null;
+  public SASTokenProvider getSASTokenProvider() throws 
AzureBlobFileSystemException {
+    AuthType authType = getEnum(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME, 
AuthType.SharedKey);
+    if (authType != AuthType.SAS) {
+      throw new SASTokenProviderException(String.format(
+        "Invalid auth type: %s is being used, expecting SAS", authType));
+    }
 
     try {
-      if (authClassName != null && !authClassName.isEmpty()) {
-        @SuppressWarnings("unchecked")
-        Class<AbfsAuthorizer> authClass = (Class<AbfsAuthorizer>) 
rawConfig.getClassByName(authClassName);
-        authorizer = authClass.getConstructor(new Class[] 
{Configuration.class}).newInstance(rawConfig);
-        LOG.trace("Initializing {}", authClassName);
-        authorizer.init();
-        LOG.trace("{} init complete", authClassName);
-      }
-    } catch (
-        IllegalAccessException
-        | InstantiationException
-        | ClassNotFoundException
-        | IllegalArgumentException
-        | InvocationTargetException
-        | NoSuchMethodException
-        | SecurityException
-        | AbfsAuthorizationException e) {
-      throw new IOException(e);
+      String configKey = FS_AZURE_SAS_TOKEN_PROVIDER_TYPE;
+      Class<? extends SASTokenProvider> sasTokenProviderClass =
+          getClass(configKey, null, SASTokenProvider.class);
+      Preconditions.checkArgument(sasTokenProviderClass != null,
+          String.format("The configuration value for \"%s\" is invalid.", 
configKey));
+
+      SASTokenProvider sasTokenProvider = ReflectionUtils
+          .newInstance(sasTokenProviderClass, rawConfig);
+      Preconditions.checkArgument(sasTokenProvider != null,
+          String.format("Failed to initialize %s", sasTokenProviderClass));
+
+      LOG.trace("Initializing {}", sasTokenProviderClass.getName());
+      sasTokenProvider.initialize(rawConfig, accountName);
+      LOG.trace("{} init complete", sasTokenProviderClass.getName());
+      return sasTokenProvider;
+    } catch (Exception e) {
+      throw new TokenAccessProviderException("Unable to load SAS token 
provider class: " + e, e);
     }
-    return authorizer;
   }
 
   void validateStorageAccountKeys() throws InvalidConfigurationValueException {
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
index 56ba9e3..8eda2f3 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
@@ -29,7 +29,6 @@ import java.net.URISyntaxException;
 import java.util.Hashtable;
 import java.util.List;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.EnumSet;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
@@ -66,9 +65,8 @@ import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemExc
 import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.FileSystemOperationUnhandledException;
 import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriAuthorityException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException;
+import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.SASTokenProviderException;
 import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
-import org.apache.hadoop.fs.azurebfs.extensions.AbfsAuthorizationException;
-import org.apache.hadoop.fs.azurebfs.extensions.AbfsAuthorizer;
 import org.apache.hadoop.fs.azurebfs.security.AbfsDelegationTokenManager;
 import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclStatus;
@@ -96,7 +94,6 @@ public class AzureBlobFileSystem extends FileSystem {
 
   private boolean delegationTokenEnabled = false;
   private AbfsDelegationTokenManager delegationTokenManager;
-  private AbfsAuthorizer authorizer;
 
   @Override
   public void initialize(URI uri, Configuration configuration)
@@ -139,9 +136,6 @@ public class AzureBlobFileSystem extends FileSystem {
 
     
AbfsClientThrottlingIntercept.initializeSingleton(abfsConfiguration.isAutoThrottlingEnabled());
 
-    // Initialize ABFS authorizer
-    //
-    this.authorizer = abfsConfiguration.getAbfsAuthorizer();
     LOG.debug("Initializing AzureBlobFileSystem for {} complete", uri);
   }
 
@@ -170,7 +164,6 @@ public class AzureBlobFileSystem extends FileSystem {
     LOG.debug("AzureBlobFileSystem.open path: {} bufferSize: {}", path, 
bufferSize);
 
     Path qualifiedPath = makeQualified(path);
-    performAbfsAuthCheck(FsAction.READ, qualifiedPath);
 
     try {
       InputStream inputStream = abfsStore.openFileForRead(qualifiedPath, 
statistics);
@@ -193,7 +186,6 @@ public class AzureBlobFileSystem extends FileSystem {
     trailingPeriodCheck(f);
 
     Path qualifiedPath = makeQualified(f);
-    performAbfsAuthCheck(FsAction.WRITE, qualifiedPath);
 
     try {
       OutputStream outputStream = abfsStore.createFile(qualifiedPath, 
overwrite,
@@ -256,7 +248,6 @@ public class AzureBlobFileSystem extends FileSystem {
         bufferSize);
 
     Path qualifiedPath = makeQualified(f);
-    performAbfsAuthCheck(FsAction.WRITE, qualifiedPath);
 
     try {
       OutputStream outputStream = abfsStore.openFileForWrite(qualifiedPath, 
false);
@@ -315,7 +306,6 @@ public class AzureBlobFileSystem extends FileSystem {
       }
 
       qualifiedDstPath = makeQualified(adjustedDst);
-      performAbfsAuthCheck(FsAction.READ_WRITE, qualifiedSrcPath, 
qualifiedDstPath);
 
       abfsStore.rename(qualifiedSrcPath, qualifiedDstPath);
       return true;
@@ -340,7 +330,6 @@ public class AzureBlobFileSystem extends FileSystem {
         "AzureBlobFileSystem.delete path: {} recursive: {}", f.toString(), 
recursive);
 
     Path qualifiedPath = makeQualified(f);
-    performAbfsAuthCheck(FsAction.WRITE, qualifiedPath);
 
     if (f.isRoot()) {
       if (!recursive) {
@@ -366,7 +355,6 @@ public class AzureBlobFileSystem extends FileSystem {
         "AzureBlobFileSystem.listStatus path: {}", f.toString());
 
     Path qualifiedPath = makeQualified(f);
-    performAbfsAuthCheck(FsAction.READ, qualifiedPath);
 
     try {
       FileStatus[] result = abfsStore.listStatus(qualifiedPath);
@@ -416,7 +404,6 @@ public class AzureBlobFileSystem extends FileSystem {
     }
 
     Path qualifiedPath = makeQualified(f);
-    performAbfsAuthCheck(FsAction.WRITE, qualifiedPath);
 
     try {
       abfsStore.createDirectory(qualifiedPath, permission == null ? 
FsPermission.getDirDefault() : permission,
@@ -445,7 +432,6 @@ public class AzureBlobFileSystem extends FileSystem {
     LOG.debug("AzureBlobFileSystem.getFileStatus path: {}", f);
 
     Path qualifiedPath = makeQualified(f);
-    performAbfsAuthCheck(FsAction.READ, qualifiedPath);
 
     try {
       return abfsStore.getFileStatus(qualifiedPath);
@@ -627,7 +613,6 @@ public class AzureBlobFileSystem extends FileSystem {
     }
 
     Path qualifiedPath = makeQualified(path);
-    performAbfsAuthCheck(FsAction.WRITE, qualifiedPath);
 
     try {
       abfsStore.setOwner(qualifiedPath,
@@ -657,9 +642,6 @@ public class AzureBlobFileSystem extends FileSystem {
       throw new IllegalArgumentException("A valid name and value must be 
specified.");
     }
 
-    Path qualifiedPath = makeQualified(path);
-    performAbfsAuthCheck(FsAction.READ_WRITE, qualifiedPath);
-
     try {
       Hashtable<String, String> properties = abfsStore.getPathStatus(path);
       String xAttrName = ensureValidAttributeName(name);
@@ -693,9 +675,6 @@ public class AzureBlobFileSystem extends FileSystem {
       throw new IllegalArgumentException("A valid name must be specified.");
     }
 
-    Path qualifiedPath = makeQualified(path);
-    performAbfsAuthCheck(FsAction.READ, qualifiedPath);
-
     byte[] value = null;
     try {
       Hashtable<String, String> properties = abfsStore.getPathStatus(path);
@@ -735,7 +714,6 @@ public class AzureBlobFileSystem extends FileSystem {
     }
 
     Path qualifiedPath = makeQualified(path);
-    performAbfsAuthCheck(FsAction.WRITE, qualifiedPath);
 
     try {
       abfsStore.setPermission(qualifiedPath,
@@ -771,7 +749,6 @@ public class AzureBlobFileSystem extends FileSystem {
     }
 
     Path qualifiedPath = makeQualified(path);
-    performAbfsAuthCheck(FsAction.WRITE, qualifiedPath);
 
     try {
       abfsStore.modifyAclEntries(qualifiedPath,
@@ -805,7 +782,6 @@ public class AzureBlobFileSystem extends FileSystem {
     }
 
     Path qualifiedPath = makeQualified(path);
-    performAbfsAuthCheck(FsAction.WRITE, qualifiedPath);
 
     try {
       abfsStore.removeAclEntries(qualifiedPath, aclSpec);
@@ -831,7 +807,6 @@ public class AzureBlobFileSystem extends FileSystem {
     }
 
     Path qualifiedPath = makeQualified(path);
-    performAbfsAuthCheck(FsAction.WRITE, qualifiedPath);
 
     try {
       abfsStore.removeDefaultAcl(qualifiedPath);
@@ -859,7 +834,6 @@ public class AzureBlobFileSystem extends FileSystem {
     }
 
     Path qualifiedPath = makeQualified(path);
-    performAbfsAuthCheck(FsAction.WRITE, qualifiedPath);
 
     try {
       abfsStore.removeAcl(qualifiedPath);
@@ -894,7 +868,6 @@ public class AzureBlobFileSystem extends FileSystem {
     }
 
     Path qualifiedPath = makeQualified(path);
-    performAbfsAuthCheck(FsAction.WRITE, qualifiedPath);
 
     try {
       abfsStore.setAcl(qualifiedPath, aclSpec);
@@ -921,7 +894,6 @@ public class AzureBlobFileSystem extends FileSystem {
     }
 
     Path qualifiedPath = makeQualified(path);
-    performAbfsAuthCheck(FsAction.READ, qualifiedPath);
 
     try {
       return abfsStore.getAclStatus(qualifiedPath);
@@ -1107,6 +1079,8 @@ public class AzureBlobFileSystem extends FileSystem {
       } else {
         throw ere;
       }
+    } else if (exception instanceof SASTokenProviderException) {
+      throw exception;
     } else {
       if (path == null) {
         throw exception;
@@ -1208,32 +1182,6 @@ public class AzureBlobFileSystem extends FileSystem {
     return abfsStore.getIsNamespaceEnabled();
   }
 
-  /**
-   * Use ABFS authorizer to check if user is authorized to perform specific
-   * {@link FsAction} on specified {@link Path}s.
-   *
-   * @param action The {@link FsAction} being requested on the provided {@link 
Path}s.
-   * @param paths The absolute paths of the storage being accessed.
-   * @throws AbfsAuthorizationException on authorization failure.
-   * @throws IOException network problems or similar.
-   * @throws IllegalArgumentException if the required parameters are not 
provided.
-   */
-  private void performAbfsAuthCheck(FsAction action, Path... paths)
-      throws AbfsAuthorizationException, IOException {
-    if (authorizer == null) {
-      LOG.debug("ABFS authorizer is not initialized. No authorization check 
will be performed.");
-    } else {
-      Preconditions.checkArgument(paths.length > 0, "no paths supplied for 
authorization check");
-
-      LOG.debug("Auth check for action: {} on paths: {}", action.toString(), 
Arrays.toString(paths));
-      if (!authorizer.isAuthorized(action, paths)) {
-        throw new AbfsAuthorizationException(
-            "User is not authorized for action " + action.toString()
-            + " on paths: " + Arrays.toString(paths));
-      }
-    }
-  }
-
   @Override
   public boolean hasPathCapability(final Path path, final String capability)
       throws IOException {
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
index 0e8afb5..bbf3608 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
@@ -71,6 +71,7 @@ import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException;
 import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
 import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultEntrySchema;
 import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema;
+import org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider;
 import org.apache.hadoop.fs.azurebfs.extensions.ExtensionHelper;
 import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
 import org.apache.hadoop.fs.azurebfs.oauth2.IdentityTransformer;
@@ -1130,8 +1131,9 @@ public class AzureBlobFileSystemStore implements 
Closeable {
 
     SharedKeyCredentials creds = null;
     AccessTokenProvider tokenProvider = null;
+    SASTokenProvider sasTokenProvider = null;
 
-    if (abfsConfiguration.getAuthType(accountName) == AuthType.SharedKey) {
+    if (authType == AuthType.SharedKey) {
       LOG.trace("Fetching SharedKey credentials");
       int dotIndex = accountName.indexOf(AbfsHttpConstants.DOT);
       if (dotIndex <= 0) {
@@ -1140,6 +1142,9 @@ public class AzureBlobFileSystemStore implements 
Closeable {
       }
       creds = new SharedKeyCredentials(accountName.substring(0, dotIndex),
             abfsConfiguration.getStorageAccountKey());
+    } else if (authType == AuthType.SAS) {
+      LOG.trace("Fetching SAS token provider");
+      sasTokenProvider = abfsConfiguration.getSASTokenProvider();
     } else {
       LOG.trace("Fetching token provider");
       tokenProvider = abfsConfiguration.getTokenProvider();
@@ -1148,9 +1153,15 @@ public class AzureBlobFileSystemStore implements 
Closeable {
     }
 
     LOG.trace("Initializing AbfsClient for {}", baseUrl);
-    this.client =  new AbfsClient(baseUrl, creds, abfsConfiguration,
-        new ExponentialRetryPolicy(abfsConfiguration.getMaxIoRetries()),
-        tokenProvider, abfsPerfTracker);
+    if (tokenProvider != null) {
+      this.client = new AbfsClient(baseUrl, creds, abfsConfiguration,
+          new ExponentialRetryPolicy(abfsConfiguration.getMaxIoRetries()),
+          tokenProvider, abfsPerfTracker);
+    } else {
+      this.client = new AbfsClient(baseUrl, creds, abfsConfiguration,
+          new ExponentialRetryPolicy(abfsConfiguration.getMaxIoRetries()),
+          sasTokenProvider, abfsPerfTracker);
+    }
     LOG.trace("AbfsClient init complete");
   }
 
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
index 3642c35..3b0111e 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
@@ -128,7 +128,8 @@ public final class ConfigurationKeys {
   public static final String FS_AZURE_ENABLE_DELEGATION_TOKEN = 
"fs.azure.enable.delegation.token";
   public static final String FS_AZURE_DELEGATION_TOKEN_PROVIDER_TYPE = 
"fs.azure.delegation.token.provider.type";
 
-  public static final String ABFS_EXTERNAL_AUTHORIZATION_CLASS = 
"abfs.external.authorization.class";
+  /** Key for SAS token provider **/
+  public static final String FS_AZURE_SAS_TOKEN_PROVIDER_TYPE = 
"fs.azure.sas.token.provider.type";
 
   private ConfigurationKeys() {}
 }
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/extensions/AbfsAuthorizationException.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/SASTokenProviderException.java
similarity index 63%
rename from 
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/extensions/AbfsAuthorizationException.java
rename to 
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/SASTokenProviderException.java
index 64a4820..09eca83 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/extensions/AbfsAuthorizationException.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/SASTokenProviderException.java
@@ -16,26 +16,23 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.fs.azurebfs.extensions;
+package org.apache.hadoop.fs.azurebfs.contracts.exceptions;
 
-import java.io.IOException;
+import org.apache.hadoop.classification.InterfaceAudience;
 
 /**
- * Exception raised on ABFS Authorization failures.
+ * Thrown if there is an error instantiating the SASTokenProvider or getting
+ * a SAS token.
  */
-public class AbfsAuthorizationException extends IOException {
+@InterfaceAudience.Private
+public class SASTokenProviderException extends AzureBlobFileSystemException {
 
-  private static final long serialVersionUID = 1L;
-
-  public AbfsAuthorizationException(String message, Exception e) {
-    super(message, e);
-  }
-
-  public AbfsAuthorizationException(String message) {
+  public SASTokenProviderException(String message) {
     super(message);
   }
 
-  public AbfsAuthorizationException(Throwable e) {
-    super(e);
+  public SASTokenProviderException(String message, Throwable cause) {
+    super(message);
+    initCause(cause);
   }
 }
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/extensions/AbfsAuthorizer.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/extensions/AbfsAuthorizer.java
deleted file mode 100644
index f4495ec..0000000
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/extensions/AbfsAuthorizer.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.azurebfs.extensions;
-
-import java.io.IOException;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsAction;
-
-/**
- * Interface to support authorization in Azure Blob File System.
- */
-@InterfaceAudience.LimitedPrivate("authorization-subsystems")
-@InterfaceStability.Unstable
-public interface AbfsAuthorizer {
-
-  /**
-   * Initialize authorizer for Azure Blob File System.
-   *
-   * @throws AbfsAuthorizationException if unable to initialize the authorizer.
-   * @throws IOException network problems or similar.
-   * @throws IllegalArgumentException if the required parameters are not 
provided.
-   */
-  void init() throws AbfsAuthorizationException, IOException;
-
-  /**
-   * Checks if the provided {@link FsAction} is allowed on the provided {@link 
Path}s.
-   *
-   * @param action the {@link FsAction} being requested on the provided {@link 
Path}s.
-   * @param absolutePaths The absolute paths of the storage being accessed.
-   * @return true if authorized, otherwise false.
-   * @throws AbfsAuthorizationException on authorization failure.
-   * @throws IOException network problems or similar.
-   * @throws IllegalArgumentException if the required parameters are not 
provided.
-   */
-  boolean isAuthorized(FsAction action, Path... absolutePaths)
-      throws AbfsAuthorizationException, IOException;
-
-}
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/extensions/SASTokenProvider.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/extensions/SASTokenProvider.java
new file mode 100644
index 0000000..9cfe2bc
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/extensions/SASTokenProvider.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.extensions;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.AccessControlException;
+
+/**
+ * Interface to support SAS authorization.
+ */
+@InterfaceAudience.LimitedPrivate("authorization-subsystems")
+@InterfaceStability.Unstable
+public interface SASTokenProvider {
+
+  String CONCAT_SOURCE_OPERATION = "concat-source";
+  String CONCAT_TARGET_OPERATION = "concat-target";
+  String CREATEFILE_OPERATION = "create";
+  String DELETE_OPERATION = "delete";
+  String EXECUTE_OPERATION = "execute";
+  String GETACL_OPERATION = "getaclstatus";
+  String GETFILESTATUS_OPERATION = "getfilestatus";
+  String LISTSTATUS_OPERATION = "liststatus";
+  String MKDIR_OPERATION = "mkdir";
+  String READ_OPERATION = "read";
+  String RENAME_SOURCE_OPERATION = "rename-source";
+  String RENAME_DESTINATION_OPERATION = "rename-destination";
+  String SETACL_OPERATION = "setacl";
+  String SETOWNER_OPERATION = "setowner";
+  String SETPERMISSION_OPERATION = "setpermission";
+  String APPEND_OPERATION = "write";
+
+  /**
+   * Initialize authorizer for Azure Blob File System.
+   * @param configuration Configuration object
+   * @param accountName Account Name
+   * @throws IOException network problems or similar.
+   */
+  void initialize(Configuration configuration, String accountName)
+      throws IOException;
+
+  /**
+   * Invokes the authorizer to obtain a SAS token.
+   *
+   * @param account the name of the storage account.
+   * @param fileSystem the name of the fileSystem.
+   * @param path the file or directory path.
+   * @param operation the operation to be performed on the path.
+   * @return a SAS token to perform the request operation.
+   * @throws IOException if there is a network error.
+   * @throws AccessControlException if access is denied.
+   */
+  String getSASToken(String account, String fileSystem, String path,
+      String operation) throws IOException, AccessControlException;
+}
\ No newline at end of file
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
index 374bde7..6e1de68 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
@@ -38,7 +38,9 @@ import org.slf4j.LoggerFactory;
 
 import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException;
+import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.SASTokenProviderException;
 import org.apache.hadoop.fs.azurebfs.extensions.ExtensionHelper;
+import org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider;
 import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
 import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
 import org.apache.hadoop.io.IOUtils;
@@ -62,13 +64,14 @@ public class AbfsClient implements Closeable {
   private final String userAgent;
   private final AbfsPerfTracker abfsPerfTracker;
 
-  private final AccessTokenProvider tokenProvider;
+  private final String accountName;
+  private final AuthType authType;
+  private AccessTokenProvider tokenProvider;
+  private SASTokenProvider sasTokenProvider;
 
-
-  public AbfsClient(final URL baseUrl, final SharedKeyCredentials 
sharedKeyCredentials,
+  private AbfsClient(final URL baseUrl, final SharedKeyCredentials 
sharedKeyCredentials,
                     final AbfsConfiguration abfsConfiguration,
                     final ExponentialRetryPolicy exponentialRetryPolicy,
-                    final AccessTokenProvider tokenProvider,
                     final AbfsPerfTracker abfsPerfTracker) {
     this.baseUrl = baseUrl;
     this.sharedKeyCredentials = sharedKeyCredentials;
@@ -76,6 +79,8 @@ public class AbfsClient implements Closeable {
     this.filesystem = 
baseUrlString.substring(baseUrlString.lastIndexOf(FORWARD_SLASH) + 1);
     this.abfsConfiguration = abfsConfiguration;
     this.retryPolicy = exponentialRetryPolicy;
+    this.accountName = abfsConfiguration.getAccountName().substring(0, 
abfsConfiguration.getAccountName().indexOf(AbfsHttpConstants.DOT));
+    this.authType = abfsConfiguration.getAuthType(accountName);
 
     String sslProviderName = null;
 
@@ -93,10 +98,27 @@ public class AbfsClient implements Closeable {
     }
 
     this.userAgent = initializeUserAgent(abfsConfiguration, sslProviderName);
-    this.tokenProvider = tokenProvider;
     this.abfsPerfTracker = abfsPerfTracker;
   }
 
+  public AbfsClient(final URL baseUrl, final SharedKeyCredentials 
sharedKeyCredentials,
+                    final AbfsConfiguration abfsConfiguration,
+                    final ExponentialRetryPolicy exponentialRetryPolicy,
+                    final AccessTokenProvider tokenProvider,
+                    final AbfsPerfTracker abfsPerfTracker) {
+    this(baseUrl, sharedKeyCredentials, abfsConfiguration, 
exponentialRetryPolicy, abfsPerfTracker);
+    this.tokenProvider = tokenProvider;
+  }
+
+  public AbfsClient(final URL baseUrl, final SharedKeyCredentials 
sharedKeyCredentials,
+                    final AbfsConfiguration abfsConfiguration,
+                    final ExponentialRetryPolicy exponentialRetryPolicy,
+                    final SASTokenProvider sasTokenProvider,
+                    final AbfsPerfTracker abfsPerfTracker) {
+    this(baseUrl, sharedKeyCredentials, abfsConfiguration, 
exponentialRetryPolicy, abfsPerfTracker);
+    this.sasTokenProvider = sasTokenProvider;
+  }
+
   @Override
   public void close() throws IOException {
     if (tokenProvider instanceof Closeable) {
@@ -191,6 +213,7 @@ public class AbfsClient implements Closeable {
     abfsUriQueryBuilder.addQuery(QUERY_PARAM_CONTINUATION, continuation);
     abfsUriQueryBuilder.addQuery(QUERY_PARAM_MAXRESULTS, 
String.valueOf(listMaxResults));
     abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_UPN, 
String.valueOf(abfsConfiguration.isUpnUsed()));
+    appendSASTokenToQuery(relativePath, SASTokenProvider.LISTSTATUS_OPERATION, 
abfsUriQueryBuilder);
 
     final URL url = createRequestUrl(abfsUriQueryBuilder.toString());
     final AbfsRestOperation op = new AbfsRestOperation(
@@ -255,6 +278,11 @@ public class AbfsClient implements Closeable {
     final AbfsUriQueryBuilder abfsUriQueryBuilder = 
createDefaultUriQueryBuilder();
     abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, isFile ? FILE : 
DIRECTORY);
 
+    String operation = isFile
+        ? SASTokenProvider.CREATEFILE_OPERATION
+        : SASTokenProvider.MKDIR_OPERATION;
+    appendSASTokenToQuery(path, operation, abfsUriQueryBuilder);
+
     final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
     final AbfsRestOperation op = new AbfsRestOperation(
             AbfsRestOperationType.CreatePath,
@@ -266,16 +294,24 @@ public class AbfsClient implements Closeable {
     return op;
   }
 
-  public AbfsRestOperation renamePath(final String source, final String 
destination, final String continuation)
+  public AbfsRestOperation renamePath(String source, final String destination, 
final String continuation)
           throws AzureBlobFileSystemException {
     final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
 
-    final String encodedRenameSource = urlEncode(FORWARD_SLASH + 
this.getFileSystem() + source);
+    String encodedRenameSource = urlEncode(FORWARD_SLASH + 
this.getFileSystem() + source);
+    if (authType == AuthType.SAS) {
+      final AbfsUriQueryBuilder srcQueryBuilder = new AbfsUriQueryBuilder();
+      appendSASTokenToQuery(source, SASTokenProvider.RENAME_SOURCE_OPERATION, 
srcQueryBuilder);
+      encodedRenameSource += srcQueryBuilder.toString();
+    }
+
+    LOG.trace("Rename source queryparam added {}", encodedRenameSource);
     requestHeaders.add(new AbfsHttpHeader(X_MS_RENAME_SOURCE, 
encodedRenameSource));
     requestHeaders.add(new AbfsHttpHeader(IF_NONE_MATCH, STAR));
 
     final AbfsUriQueryBuilder abfsUriQueryBuilder = 
createDefaultUriQueryBuilder();
     abfsUriQueryBuilder.addQuery(QUERY_PARAM_CONTINUATION, continuation);
+    appendSASTokenToQuery(destination, 
SASTokenProvider.RENAME_DESTINATION_OPERATION, abfsUriQueryBuilder);
 
     final URL url = createRequestUrl(destination, 
abfsUriQueryBuilder.toString());
     final AbfsRestOperation op = new AbfsRestOperation(
@@ -299,6 +335,7 @@ public class AbfsClient implements Closeable {
     final AbfsUriQueryBuilder abfsUriQueryBuilder = 
createDefaultUriQueryBuilder();
     abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, APPEND_ACTION);
     abfsUriQueryBuilder.addQuery(QUERY_PARAM_POSITION, 
Long.toString(position));
+    appendSASTokenToQuery(path, SASTokenProvider.APPEND_OPERATION, 
abfsUriQueryBuilder);
 
     final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
     final AbfsRestOperation op = new AbfsRestOperation(
@@ -324,6 +361,7 @@ public class AbfsClient implements Closeable {
     abfsUriQueryBuilder.addQuery(QUERY_PARAM_POSITION, 
Long.toString(position));
     abfsUriQueryBuilder.addQuery(QUERY_PARAM_RETAIN_UNCOMMITTED_DATA, 
String.valueOf(retainUncommittedData));
     abfsUriQueryBuilder.addQuery(QUERY_PARAM_CLOSE, String.valueOf(isClose));
+    appendSASTokenToQuery(path, SASTokenProvider.APPEND_OPERATION, 
abfsUriQueryBuilder);
 
     final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
     final AbfsRestOperation op = new AbfsRestOperation(
@@ -365,6 +403,7 @@ public class AbfsClient implements Closeable {
 
     final AbfsUriQueryBuilder abfsUriQueryBuilder = 
createDefaultUriQueryBuilder();
     abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_UPN, 
String.valueOf(abfsConfiguration.isUpnUsed()));
+    appendSASTokenToQuery(path, SASTokenProvider.GETFILESTATUS_OPERATION, 
abfsUriQueryBuilder);
 
     final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
     final AbfsRestOperation op = new AbfsRestOperation(
@@ -385,6 +424,7 @@ public class AbfsClient implements Closeable {
     requestHeaders.add(new AbfsHttpHeader(IF_MATCH, eTag));
 
     final AbfsUriQueryBuilder abfsUriQueryBuilder = 
createDefaultUriQueryBuilder();
+    appendSASTokenToQuery(path, SASTokenProvider.READ_OPERATION, 
abfsUriQueryBuilder);
 
     final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
 
@@ -409,6 +449,7 @@ public class AbfsClient implements Closeable {
     final AbfsUriQueryBuilder abfsUriQueryBuilder = 
createDefaultUriQueryBuilder();
     abfsUriQueryBuilder.addQuery(QUERY_PARAM_RECURSIVE, 
String.valueOf(recursive));
     abfsUriQueryBuilder.addQuery(QUERY_PARAM_CONTINUATION, continuation);
+    appendSASTokenToQuery(path, SASTokenProvider.DELETE_OPERATION, 
abfsUriQueryBuilder);
 
     final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
     final AbfsRestOperation op = new AbfsRestOperation(
@@ -438,6 +479,7 @@ public class AbfsClient implements Closeable {
 
     final AbfsUriQueryBuilder abfsUriQueryBuilder = 
createDefaultUriQueryBuilder();
     abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_ACTION, 
AbfsHttpConstants.SET_ACCESS_CONTROL);
+    appendSASTokenToQuery(path, SASTokenProvider.SETOWNER_OPERATION, 
abfsUriQueryBuilder);
 
     final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
     final AbfsRestOperation op = new AbfsRestOperation(
@@ -462,6 +504,7 @@ public class AbfsClient implements Closeable {
 
     final AbfsUriQueryBuilder abfsUriQueryBuilder = 
createDefaultUriQueryBuilder();
     abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_ACTION, 
AbfsHttpConstants.SET_ACCESS_CONTROL);
+    appendSASTokenToQuery(path, SASTokenProvider.SETPERMISSION_OPERATION, 
abfsUriQueryBuilder);
 
     final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
     final AbfsRestOperation op = new AbfsRestOperation(
@@ -494,6 +537,7 @@ public class AbfsClient implements Closeable {
 
     final AbfsUriQueryBuilder abfsUriQueryBuilder = 
createDefaultUriQueryBuilder();
     abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_ACTION, 
AbfsHttpConstants.SET_ACCESS_CONTROL);
+    appendSASTokenToQuery(path, SASTokenProvider.SETACL_OPERATION, 
abfsUriQueryBuilder);
 
     final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
     final AbfsRestOperation op = new AbfsRestOperation(
@@ -516,6 +560,7 @@ public class AbfsClient implements Closeable {
     final AbfsUriQueryBuilder abfsUriQueryBuilder = 
createDefaultUriQueryBuilder();
     abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_ACTION, 
AbfsHttpConstants.GET_ACCESS_CONTROL);
     abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_UPN, 
String.valueOf(useUPN));
+    appendSASTokenToQuery(path, SASTokenProvider.GETACL_OPERATION, 
abfsUriQueryBuilder);
 
     final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
     final AbfsRestOperation op = new AbfsRestOperation(
@@ -550,6 +595,34 @@ public class AbfsClient implements Closeable {
     return op;
   }
 
+  /**
+   * If configured for SAS AuthType, appends SAS token to queryBuilder
+   * @param path
+   * @param operation
+   * @param queryBuilder
+   * @throws SASTokenProviderException
+   */
+  private void appendSASTokenToQuery(String path, String operation, 
AbfsUriQueryBuilder queryBuilder) throws SASTokenProviderException {
+    if (this.authType == AuthType.SAS) {
+      try {
+        LOG.trace("Fetch SAS token for {} on {}", operation, path);
+        String sasToken = sasTokenProvider.getSASToken(this.accountName,
+            this.filesystem, path, operation);
+        if ((sasToken == null) || sasToken.isEmpty()) {
+          throw new UnsupportedOperationException("SASToken received is empty 
or null");
+        }
+
+        queryBuilder.setSASToken(sasToken);
+        LOG.trace("SAS token fetch complete for {} on {}", operation, path);
+      } catch (Exception ex) {
+        throw new SASTokenProviderException(String.format("Failed to acquire a 
SAS token for %s on %s due to %s",
+            operation,
+            path,
+            ex.toString()));
+      }
+    }
+  }
+
   private URL createRequestUrl(final String query) throws 
AzureBlobFileSystemException {
     return createRequestUrl(EMPTY_STRING, query);
   }
@@ -600,6 +673,10 @@ public class AbfsClient implements Closeable {
     }
   }
 
+  public AuthType getAuthType() {
+    return authType;
+  }
+
   @VisibleForTesting
   String initializeUserAgent(final AbfsConfiguration abfsConfiguration,
                              final String sslProviderName) {
@@ -634,4 +711,9 @@ public class AbfsClient implements Closeable {
   URL getBaseUrl() {
     return baseUrl;
   }
+
+  @VisibleForTesting
+  public SASTokenProvider getSasTokenProvider() {
+    return this.sasTokenProvider;
+  }
 }
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
index d2bda08..445c366 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
@@ -160,18 +160,26 @@ public class AbfsRestOperation {
       // initialize the HTTP request and open the connection
       httpOperation = new AbfsHttpOperation(url, method, requestHeaders);
 
-      // sign the HTTP request
-      if (client.getAccessToken() == null) {
-        LOG.debug("Signing request with shared key");
-        // sign the HTTP request
-        client.getSharedKeyCredentials().signRequest(
-                httpOperation.getConnection(),
-                hasRequestBody ? bufferLength : 0);
-      } else {
-        LOG.debug("Authenticating request with OAuth2 access token");
-        
httpOperation.getConnection().setRequestProperty(HttpHeaderConfigurations.AUTHORIZATION,
-                client.getAccessToken());
+      switch(client.getAuthType()) {
+        case Custom:
+        case OAuth:
+          LOG.debug("Authenticating request with OAuth2 access token");
+          
httpOperation.getConnection().setRequestProperty(HttpHeaderConfigurations.AUTHORIZATION,
+              client.getAccessToken());
+          break;
+        case SAS:
+          // do nothing; the SAS token should already be appended to the query 
string
+          break;
+        case SharedKey:
+          // sign the HTTP request
+          LOG.debug("Signing request with shared key");
+          // sign the HTTP request
+          client.getSharedKeyCredentials().signRequest(
+              httpOperation.getConnection(),
+              hasRequestBody ? bufferLength : 0);
+          break;
       }
+
       // dump the headers
       AbfsIoUtils.dumpHeadersToDebugLog("Request Headers",
           httpOperation.getConnection().getRequestProperties());
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsUriQueryBuilder.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsUriQueryBuilder.java
index a200b40..7820021 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsUriQueryBuilder.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsUriQueryBuilder.java
@@ -29,6 +29,7 @@ import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemExc
  */
 public class AbfsUriQueryBuilder {
   private Map<String, String> parameters;
+  private String sasToken = null;
 
   public AbfsUriQueryBuilder() {
     this.parameters = new HashMap<>();
@@ -40,6 +41,10 @@ public class AbfsUriQueryBuilder {
     }
   }
 
+  public void setSASToken(final String sasToken) {
+    this.sasToken = sasToken;
+  }
+
   @Override
   public String toString() {
     StringBuilder sb = new StringBuilder();
@@ -59,6 +64,16 @@ public class AbfsUriQueryBuilder {
         throw new IllegalArgumentException("Query string param is not 
encode-able: " + entry.getKey() + "=" + entry.getValue());
       }
     }
+    // append SAS Token
+    if (sasToken != null) {
+      if (first) {
+        sb.append(AbfsHttpConstants.QUESTION_MARK);
+      } else {
+        sb.append(AbfsHttpConstants.AND_MARK);
+      }
+
+      sb.append(sasToken);
+    }
     return sb.toString();
   }
-}
+}
\ No newline at end of file
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AuthType.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AuthType.java
index c95b92c..03ffece 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AuthType.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AuthType.java
@@ -23,5 +23,6 @@ package org.apache.hadoop.fs.azurebfs.services;
 public enum AuthType {
     SharedKey,
     OAuth,
-    Custom
+    Custom,
+    SAS
 }
diff --git a/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md 
b/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md
index 79ec2ad..01c1fbd 100644
--- a/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md
+++ b/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md
@@ -626,7 +626,7 @@ points for third-parties to integrate their authentication 
and authorization
 services into the ABFS client.
 
 * `CustomDelegationTokenManager` : adds ability to issue Hadoop Delegation 
Tokens.
-* `AbfsAuthorizer` permits client-side authorization of file operations.
+* `SASTokenProvider`: allows for custom provision of Azure Storage Shared 
Access Signature (SAS) tokens.
 * `CustomTokenProviderAdaptee`: allows for custom provision of
 Azure OAuth tokens.
 * `KeyProvider`.
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java
index 3eaed1f..a42648f 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.fs.azure.AzureNativeFileSystemStore;
 import org.apache.hadoop.fs.azure.NativeAzureFileSystem;
 import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation;
 import org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes;
+import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
 import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
 import org.apache.hadoop.fs.azurebfs.utils.UriUtils;
 import org.apache.hadoop.fs.contract.ContractTestUtils;
@@ -73,6 +74,7 @@ public abstract class AbstractAbfsIntegrationTest extends
   private String accountName;
   private String testUrl;
   private AuthType authType;
+  private boolean useConfiguredFileSystem = false;
 
   protected AbstractAbfsIntegrationTest() throws Exception {
     fileSystemName = TEST_CONTAINER_PREFIX + UUID.randomUUID().toString();
@@ -134,7 +136,9 @@ public abstract class AbstractAbfsIntegrationTest extends
     createFileSystem();
 
     // Only live account without namespace support can run ABFS&WASB 
compatibility tests
-    if (!isIPAddress && !abfs.getIsNamespaceEnabled()) {
+    if (!isIPAddress
+        && (abfsConfig.getAuthType(accountName) != AuthType.SAS)
+        && !abfs.getIsNamespaceEnabled()) {
       final URI wasbUri = new URI(abfsUrlToWasbUrl(getTestUrl()));
       final AzureNativeFileSystemStore azureNativeFileSystemStore =
           new AzureNativeFileSystemStore();
@@ -167,19 +171,21 @@ public abstract class AbstractAbfsIntegrationTest extends
         return;
       }
 
-      final AzureBlobFileSystemStore abfsStore = abfs.getAbfsStore();
-      abfsStore.deleteFilesystem();
-
-      AbfsRestOperationException ex = intercept(
-              AbfsRestOperationException.class,
-              new Callable<Hashtable<String, String>>() {
-                @Override
-                public Hashtable<String, String> call() throws Exception {
-                  return abfsStore.getFilesystemProperties();
-                }
-              });
-      if (FILE_SYSTEM_NOT_FOUND.getStatusCode() != ex.getStatusCode()) {
-        LOG.warn("Deleted test filesystem may still exist: {}", abfs, ex);
+      // Delete all uniquely created filesystem from the account
+      if (!useConfiguredFileSystem) {
+        final AzureBlobFileSystemStore abfsStore = abfs.getAbfsStore();
+        abfsStore.deleteFilesystem();
+
+        AbfsRestOperationException ex = 
intercept(AbfsRestOperationException.class,
+            new Callable<Hashtable<String, String>>() {
+              @Override
+              public Hashtable<String, String> call() throws Exception {
+                return abfsStore.getFilesystemProperties();
+              }
+            });
+        if (FILE_SYSTEM_NOT_FOUND.getStatusCode() != ex.getStatusCode()) {
+          LOG.warn("Deleted test filesystem may still exist: {}", abfs, ex);
+        }
       }
     } catch (Exception e) {
       LOG.warn("During cleanup: {}", e, e);
@@ -189,6 +195,32 @@ public abstract class AbstractAbfsIntegrationTest extends
     }
   }
 
+
+  public void loadConfiguredFileSystem() throws Exception {
+      // disable auto-creation of filesystem
+      
abfsConfig.setBoolean(AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION,
+          false);
+
+      // AbstractAbfsIntegrationTest always uses a new instance of FileSystem,
+      // need to disable that and force filesystem provided in test configs.
+      String[] authorityParts =
+          (new 
URI(rawConfig.get(FS_AZURE_CONTRACT_TEST_URI))).getRawAuthority().split(
+        AbfsHttpConstants.AZURE_DISTRIBUTED_FILE_SYSTEM_AUTHORITY_DELIMITER, 
2);
+      this.fileSystemName = authorityParts[0];
+
+      // Reset URL with configured filesystem
+      final String abfsUrl = this.getFileSystemName() + "@" + 
this.getAccountName();
+      URI defaultUri = null;
+
+      defaultUri = new URI(abfsScheme, abfsUrl, null, null, null);
+
+      this.testUrl = defaultUri.toString();
+      abfsConfig.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
+          defaultUri.toString());
+
+    useConfiguredFileSystem = true;
+  }
+
   public AzureBlobFileSystem getFileSystem() throws IOException {
     return abfs;
   }
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsIdentityTransformer.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsIdentityTransformer.java
index 0a2df95..b44914e 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsIdentityTransformer.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsIdentityTransformer.java
@@ -62,7 +62,6 @@ public class ITestAbfsIdentityTransformer extends 
AbstractAbfsScaleTest{
 
   public ITestAbfsIdentityTransformer() throws Exception {
     super();
-    UserGroupInformation.reset();
     userGroupInfo = UserGroupInformation.getCurrentUser();
     localUser = userGroupInfo.getShortUserName();
     localGroup = userGroupInfo.getPrimaryGroupName();
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAuthorization.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAuthorization.java
index e579c14..94e0ce3 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAuthorization.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAuthorization.java
@@ -18,54 +18,107 @@
 
 package org.apache.hadoop.fs.azurebfs;
 
+import java.io.IOException;
 import java.util.Arrays;
-import java.util.List;
 import java.util.UUID;
 
+import org.junit.Assume;
 import org.junit.Test;
 
+import org.apache.hadoop.conf.Configuration;
+
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
-import org.apache.hadoop.fs.azurebfs.extensions.AbfsAuthorizationException;
-import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys;
+import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.SASTokenProviderException;
+import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.TokenAccessProviderException;
+import org.apache.hadoop.fs.azurebfs.extensions.MockSASTokenProvider;
+import org.apache.hadoop.fs.azurebfs.services.AuthType;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 
-import static org.apache.hadoop.fs.azurebfs.extensions.MockAbfsAuthorizer.*;
+import static 
org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.MOCK_SASTOKENPROVIDER_FAIL_INIT;
+import static 
org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.MOCK_SASTOKENPROVIDER_RETURN_EMPTY_SAS_TOKEN;
 import static org.apache.hadoop.fs.azurebfs.utils.AclTestHelpers.aclEntry;
 import static org.apache.hadoop.fs.permission.AclEntryScope.ACCESS;
 import static org.apache.hadoop.fs.permission.AclEntryType.GROUP;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
-import static org.junit.Assume.assumeTrue;
 
 /**
  * Test Perform Authorization Check operation
  */
 public class ITestAzureBlobFileSystemAuthorization extends 
AbstractAbfsIntegrationTest {
 
-  private static final Path TEST_READ_ONLY_FILE_PATH_0 = new 
Path(TEST_READ_ONLY_FILE_0);
-  private static final Path TEST_READ_ONLY_FOLDER_PATH = new 
Path(TEST_READ_ONLY_FOLDER);
-  private static final Path TEST_WRITE_ONLY_FILE_PATH_0 = new 
Path(TEST_WRITE_ONLY_FILE_0);
-  private static final Path TEST_WRITE_ONLY_FILE_PATH_1 = new 
Path(TEST_WRITE_ONLY_FILE_1);
-  private static final Path TEST_READ_WRITE_FILE_PATH_0 = new 
Path(TEST_READ_WRITE_FILE_0);
-  private static final Path TEST_READ_WRITE_FILE_PATH_1 = new 
Path(TEST_READ_WRITE_FILE_1);
-  private static final Path TEST_WRITE_ONLY_FOLDER_PATH = new 
Path(TEST_WRITE_ONLY_FOLDER);
-  private static final Path TEST_WRITE_THEN_READ_ONLY_PATH = new 
Path(TEST_WRITE_THEN_READ_ONLY);
-  private static final String TEST_AUTHZ_CLASS = 
"org.apache.hadoop.fs.azurebfs.extensions.MockAbfsAuthorizer";
+  private static final String TEST_AUTHZ_CLASS = 
"org.apache.hadoop.fs.azurebfs.extensions.MockSASTokenProvider";
+  private static final String TEST_ERR_AUTHZ_CLASS = 
"org.apache.hadoop.fs.azurebfs.extensions.MockErrorSASTokenProvider";
   private static final String TEST_USER = UUID.randomUUID().toString();
   private static final String TEST_GROUP = UUID.randomUUID().toString();
   private static final String BAR = UUID.randomUUID().toString();
 
   public ITestAzureBlobFileSystemAuthorization() throws Exception {
+    // The mock SAS token provider relies on the account key to generate SAS.
+    Assume.assumeTrue(this.getAuthType() == AuthType.SharedKey);
   }
 
   @Override
   public void setup() throws Exception {
-    
this.getConfiguration().set(ConfigurationKeys.ABFS_EXTERNAL_AUTHORIZATION_CLASS,
 TEST_AUTHZ_CLASS);
+    boolean isHNSEnabled = this.getConfiguration().getBoolean(
+        TestConfigurationKeys.FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT, false);
+    Assume.assumeTrue(isHNSEnabled);
+    loadConfiguredFileSystem();
+    
this.getConfiguration().set(ConfigurationKeys.FS_AZURE_SAS_TOKEN_PROVIDER_TYPE, 
TEST_AUTHZ_CLASS);
+    
this.getConfiguration().set(ConfigurationKeys.FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME,
 "SAS");
     super.setup();
   }
 
   @Test
+  public void testSASTokenProviderInitializeException() throws Exception {
+    final AzureBlobFileSystem fs = this.getFileSystem();
+
+    final AzureBlobFileSystem testFs = new AzureBlobFileSystem();
+    Configuration testConfig = this.getConfiguration().getRawConfiguration();
+    testConfig.set(ConfigurationKeys.FS_AZURE_SAS_TOKEN_PROVIDER_TYPE, 
TEST_ERR_AUTHZ_CLASS);
+    testConfig.set(MOCK_SASTOKENPROVIDER_FAIL_INIT, "true");
+
+    intercept(TokenAccessProviderException.class,
+        ()-> {
+          testFs.initialize(fs.getUri(), 
this.getConfiguration().getRawConfiguration());
+        });
+  }
+
+  @Test
+  public void testSASTokenProviderEmptySASToken() throws Exception {
+    final AzureBlobFileSystem fs = this.getFileSystem();
+
+    final AzureBlobFileSystem testFs = new AzureBlobFileSystem();
+    Configuration testConfig = this.getConfiguration().getRawConfiguration();
+    testConfig.set(ConfigurationKeys.FS_AZURE_SAS_TOKEN_PROVIDER_TYPE, 
TEST_ERR_AUTHZ_CLASS);
+    testConfig.set(MOCK_SASTOKENPROVIDER_RETURN_EMPTY_SAS_TOKEN, "true");
+
+    testFs.initialize(fs.getUri(),
+        this.getConfiguration().getRawConfiguration());
+    intercept(SASTokenProviderException.class,
+        () -> {
+          testFs.create(new org.apache.hadoop.fs.Path("/testFile"));
+        });
+  }
+
+  @Test
+  public void testSASTokenProviderNullSASToken() throws Exception {
+    final AzureBlobFileSystem fs = this.getFileSystem();
+
+    final AzureBlobFileSystem testFs = new AzureBlobFileSystem();
+    Configuration testConfig = this.getConfiguration().getRawConfiguration();
+    testConfig.set(ConfigurationKeys.FS_AZURE_SAS_TOKEN_PROVIDER_TYPE, 
TEST_ERR_AUTHZ_CLASS);
+
+    testFs.initialize(fs.getUri(), 
this.getConfiguration().getRawConfiguration());
+    intercept(SASTokenProviderException.class,
+        ()-> {
+          testFs.create(new org.apache.hadoop.fs.Path("/testFile"));
+        });
+  }
+
+  @Test
   public void testOpenFileWithInvalidPath() throws Exception {
     final AzureBlobFileSystem fs = this.getFileSystem();
     intercept(IllegalArgumentException.class,
@@ -76,291 +129,232 @@ public class ITestAzureBlobFileSystemAuthorization 
extends AbstractAbfsIntegrati
 
   @Test
   public void testOpenFileAuthorized() throws Exception {
-    final AzureBlobFileSystem fs = this.getFileSystem();
-    fs.create(TEST_WRITE_THEN_READ_ONLY_PATH).close();
-    fs.open(TEST_WRITE_THEN_READ_ONLY_PATH).close();
+    runTest(FileSystemOperations.Open, false);
   }
 
   @Test
   public void testOpenFileUnauthorized() throws Exception {
-    final AzureBlobFileSystem fs = this.getFileSystem();
-    fs.create(TEST_WRITE_ONLY_FILE_PATH_0).close();
-    intercept(AbfsAuthorizationException.class,
-        ()-> {
-          fs.open(TEST_WRITE_ONLY_FILE_PATH_0).close();
-    });
+    runTest(FileSystemOperations.Open, true);
   }
 
   @Test
   public void testCreateFileAuthorized() throws Exception {
-    final AzureBlobFileSystem fs = this.getFileSystem();
-    fs.create(TEST_WRITE_ONLY_FILE_PATH_0).close();
+    runTest(FileSystemOperations.CreatePath, false);
   }
 
   @Test
   public void testCreateFileUnauthorized() throws Exception {
-    final AzureBlobFileSystem fs = this.getFileSystem();
-    intercept(AbfsAuthorizationException.class,
-        ()-> {
-          fs.create(TEST_READ_ONLY_FILE_PATH_0).close();
-    });
+    runTest(FileSystemOperations.CreatePath, true);
   }
 
   @Test
   public void testAppendFileAuthorized() throws Exception {
-    final AzureBlobFileSystem fs = this.getFileSystem();
-    fs.create(TEST_WRITE_ONLY_FILE_PATH_0).close();
-    fs.append(TEST_WRITE_ONLY_FILE_PATH_0).close();
+    runTest(FileSystemOperations.Append, false);
   }
 
   @Test
   public void testAppendFileUnauthorized() throws Exception {
-    final AzureBlobFileSystem fs = this.getFileSystem();
-    fs.create(TEST_WRITE_THEN_READ_ONLY_PATH).close();
-    intercept(AbfsAuthorizationException.class,
-        ()-> {
-          fs.append(TEST_WRITE_THEN_READ_ONLY_PATH).close();
-    });
+    runTest(FileSystemOperations.Append, true);
   }
 
   @Test
   public void testRenameAuthorized() throws Exception {
-    final AzureBlobFileSystem fs = this.getFileSystem();
-    fs.rename(TEST_READ_WRITE_FILE_PATH_0, TEST_READ_WRITE_FILE_PATH_1);
+    runTest(FileSystemOperations.RenamePath, false);
   }
 
   @Test
   public void testRenameUnauthorized() throws Exception {
-    final AzureBlobFileSystem fs = this.getFileSystem();
-    intercept(AbfsAuthorizationException.class,
-        ()-> {
-          fs.rename(TEST_WRITE_ONLY_FILE_PATH_0, TEST_WRITE_ONLY_FILE_PATH_1);
-    });
+    runTest(FileSystemOperations.RenamePath, true);
   }
 
   @Test
   public void testDeleteFileAuthorized() throws Exception {
-    final AzureBlobFileSystem fs = this.getFileSystem();
-    fs.create(TEST_WRITE_ONLY_FILE_PATH_0).close();
-    fs.delete(TEST_WRITE_ONLY_FILE_PATH_0, false);
+    runTest(FileSystemOperations.DeletePath, false);
   }
 
   @Test
   public void testDeleteFileUnauthorized() throws Exception {
-    final AzureBlobFileSystem fs = this.getFileSystem();
-    fs.create(TEST_WRITE_THEN_READ_ONLY_PATH).close();
-    intercept(AbfsAuthorizationException.class,
-        ()-> {
-          fs.delete(TEST_WRITE_THEN_READ_ONLY_PATH, false);
-    });
+    runTest(FileSystemOperations.DeletePath, true);
   }
 
   @Test
   public void testListStatusAuthorized() throws Exception {
-    final AzureBlobFileSystem fs = getFileSystem();
-    fs.create(TEST_WRITE_THEN_READ_ONLY_PATH).close();
-    fs.listStatus(TEST_WRITE_THEN_READ_ONLY_PATH);
+    runTest(FileSystemOperations.ListPaths, false);
   }
 
   @Test
   public void testListStatusUnauthorized() throws Exception {
-    final AzureBlobFileSystem fs = getFileSystem();
-    fs.create(TEST_WRITE_ONLY_FILE_PATH_0).close();
-    intercept(AbfsAuthorizationException.class,
-        ()-> {
-          fs.listStatus(TEST_WRITE_ONLY_FILE_PATH_0);
-    });
+    runTest(FileSystemOperations.ListPaths, true);
   }
 
   @Test
   public void testMkDirsAuthorized() throws Exception {
-    final AzureBlobFileSystem fs = getFileSystem();
-    fs.mkdirs(TEST_WRITE_ONLY_FOLDER_PATH, new FsPermission(FsAction.ALL, 
FsAction.NONE, FsAction.NONE));
+    runTest(FileSystemOperations.Mkdir, false);
   }
 
   @Test
   public void testMkDirsUnauthorized() throws Exception {
-    final AzureBlobFileSystem fs = getFileSystem();
-    intercept(AbfsAuthorizationException.class,
-        ()-> {
-          fs.mkdirs(TEST_READ_ONLY_FOLDER_PATH, new FsPermission(FsAction.ALL, 
FsAction.NONE, FsAction.NONE));
-    });
+    runTest(FileSystemOperations.Mkdir, true);
   }
 
   @Test
   public void testGetFileStatusAuthorized() throws Exception {
-    final AzureBlobFileSystem fs = getFileSystem();
-    fs.create(TEST_WRITE_THEN_READ_ONLY_PATH).close();
-    fs.getFileStatus(TEST_WRITE_THEN_READ_ONLY_PATH);
+    runTest(FileSystemOperations.GetPathStatus, false);
   }
 
   @Test
   public void testGetFileStatusUnauthorized() throws Exception {
-    final AzureBlobFileSystem fs = getFileSystem();
-    fs.create(TEST_WRITE_ONLY_FILE_PATH_0).close();
-    intercept(AbfsAuthorizationException.class,
-        ()-> {
-          fs.getFileStatus(TEST_WRITE_ONLY_FILE_PATH_0);
-    });
-  }
-
-  @Test
-  public void testSetOwnerAuthorized() throws Exception {
-    final AzureBlobFileSystem fs = getFileSystem();
-    assumeTrue("This test case only runs when namespace is enabled", 
fs.getIsNamespaceEnabled());
-    fs.create(TEST_WRITE_ONLY_FILE_PATH_0).close();
-    fs.setOwner(TEST_WRITE_ONLY_FILE_PATH_0, TEST_USER, TEST_GROUP);
+    runTest(FileSystemOperations.GetPathStatus, true);
   }
 
   @Test
   public void testSetOwnerUnauthorized() throws Exception {
-    final AzureBlobFileSystem fs = getFileSystem();
-    assumeTrue("This test case only runs when namespace is enabled", 
fs.getIsNamespaceEnabled());
-    fs.create(TEST_WRITE_THEN_READ_ONLY_PATH).close();
-    intercept(AbfsAuthorizationException.class,
-        ()-> {
-          fs.setOwner(TEST_WRITE_THEN_READ_ONLY_PATH, TEST_USER, TEST_GROUP);
-    });
-  }
-
-  @Test
-  public void testSetPermissionAuthorized() throws Exception {
-    final AzureBlobFileSystem fs = getFileSystem();
-    assumeTrue("This test case only runs when namespace is enabled", 
fs.getIsNamespaceEnabled());
-    fs.create(TEST_WRITE_ONLY_FILE_PATH_0).close();
-    fs.setPermission(TEST_WRITE_ONLY_FILE_PATH_0, new 
FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE));
+    runTest(FileSystemOperations.SetOwner, true);
   }
 
   @Test
   public void testSetPermissionUnauthorized() throws Exception {
-    final AzureBlobFileSystem fs = getFileSystem();
-    assumeTrue("This test case only runs when namespace is enabled", 
fs.getIsNamespaceEnabled());
-    fs.create(TEST_WRITE_THEN_READ_ONLY_PATH).close();
-    intercept(AbfsAuthorizationException.class,
-        ()-> {
-          fs.setPermission(TEST_WRITE_THEN_READ_ONLY_PATH, new 
FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE));
-    });
-  }
-
-  @Test
-  public void testModifyAclEntriesAuthorized() throws Exception {
-    final AzureBlobFileSystem fs = getFileSystem();
-    assumeTrue("This test case only runs when namespace is enabled", 
fs.getIsNamespaceEnabled());
-    fs.create(TEST_WRITE_ONLY_FILE_PATH_0).close();
-    List<AclEntry> aclSpec = Arrays.asList(aclEntry(ACCESS, GROUP, BAR, 
FsAction.ALL));
-    fs.modifyAclEntries(TEST_WRITE_ONLY_FILE_PATH_0, aclSpec);
+    runTest(FileSystemOperations.SetPermissions, true);
   }
 
   @Test
   public void testModifyAclEntriesUnauthorized() throws Exception {
-    final AzureBlobFileSystem fs = getFileSystem();
-    assumeTrue("This test case only runs when namespace is enabled", 
fs.getIsNamespaceEnabled());
-    fs.create(TEST_WRITE_THEN_READ_ONLY_PATH).close();
-    List<AclEntry> aclSpec = Arrays.asList(aclEntry(ACCESS, GROUP, BAR, 
FsAction.ALL));
-    intercept(AbfsAuthorizationException.class,
-        ()-> {
-          fs.modifyAclEntries(TEST_WRITE_THEN_READ_ONLY_PATH, aclSpec);
-    });
-  }
-
-  @Test
-  public void testRemoveAclEntriesAuthorized() throws Exception {
-    final AzureBlobFileSystem fs = getFileSystem();
-    assumeTrue("This test case only runs when namespace is enabled", 
fs.getIsNamespaceEnabled());
-    fs.create(TEST_WRITE_ONLY_FILE_PATH_0).close();
-    List<AclEntry> aclSpec = Arrays.asList(aclEntry(ACCESS, GROUP, BAR, 
FsAction.ALL));
-    fs.removeAclEntries(TEST_WRITE_ONLY_FILE_PATH_0, aclSpec);
+    runTest(FileSystemOperations.ModifyAclEntries, true);
   }
 
   @Test
   public void testRemoveAclEntriesUnauthorized() throws Exception {
-    final AzureBlobFileSystem fs = getFileSystem();
-    assumeTrue("This test case only runs when namespace is enabled", 
fs.getIsNamespaceEnabled());
-    fs.create(TEST_WRITE_THEN_READ_ONLY_PATH).close();
-    List<AclEntry> aclSpec = Arrays.asList(aclEntry(ACCESS, GROUP, BAR, 
FsAction.ALL));
-    intercept(AbfsAuthorizationException.class,
-        ()-> {
-          fs.removeAclEntries(TEST_WRITE_THEN_READ_ONLY_PATH, aclSpec);
-    });
-  }
-
-  @Test
-  public void testRemoveDefaultAclAuthorized() throws Exception {
-    final AzureBlobFileSystem fs = getFileSystem();
-    assumeTrue("This test case only runs when namespace is enabled", 
fs.getIsNamespaceEnabled());
-    fs.create(TEST_WRITE_ONLY_FILE_PATH_0).close();
-    fs.removeDefaultAcl(TEST_WRITE_ONLY_FILE_PATH_0);
+    runTest(FileSystemOperations.RemoveAclEntries, true);
   }
 
   @Test
   public void testRemoveDefaultAclUnauthorized() throws Exception {
-    final AzureBlobFileSystem fs = getFileSystem();
-    assumeTrue("This test case only runs when namespace is enabled", 
fs.getIsNamespaceEnabled());
-    fs.create(TEST_WRITE_THEN_READ_ONLY_PATH).close();
-    intercept(AbfsAuthorizationException.class,
-        ()-> {
-          fs.removeDefaultAcl(TEST_WRITE_THEN_READ_ONLY_PATH);
-    });
-  }
-
-  @Test
-  public void testRemoveAclAuthorized() throws Exception {
-    final AzureBlobFileSystem fs = getFileSystem();
-    assumeTrue("This test case only runs when namespace is enabled", 
fs.getIsNamespaceEnabled());
-    fs.create(TEST_WRITE_ONLY_FILE_PATH_0).close();
-    fs.removeAcl(TEST_WRITE_ONLY_FILE_PATH_0);
+    runTest(FileSystemOperations.RemoveDefaultAcl, true);
   }
 
   @Test
   public void testRemoveAclUnauthorized() throws Exception {
-    final AzureBlobFileSystem fs = getFileSystem();
-    assumeTrue("This test case only runs when namespace is enabled", 
fs.getIsNamespaceEnabled());
-    fs.create(TEST_WRITE_THEN_READ_ONLY_PATH).close();
-    intercept(AbfsAuthorizationException.class,
-        ()-> {
-          fs.removeAcl(TEST_WRITE_THEN_READ_ONLY_PATH);
-    });
-  }
-
-  @Test
-  public void testSetAclAuthorized() throws Exception {
-    final AzureBlobFileSystem fs = getFileSystem();
-    assumeTrue("This test case only runs when namespace is enabled", 
fs.getIsNamespaceEnabled());
-    fs.create(TEST_WRITE_ONLY_FILE_PATH_0).close();
-    List<AclEntry> aclSpec = Arrays.asList(aclEntry(ACCESS, GROUP, BAR, 
FsAction.ALL));
-    fs.setAcl(TEST_WRITE_ONLY_FILE_PATH_0, aclSpec);
+    runTest(FileSystemOperations.RemoveAcl, true);
   }
 
   @Test
   public void testSetAclUnauthorized() throws Exception {
-    final AzureBlobFileSystem fs = getFileSystem();
-    assumeTrue("This test case only runs when namespace is enabled", 
fs.getIsNamespaceEnabled());
-    fs.create(TEST_WRITE_THEN_READ_ONLY_PATH).close();
-    List<AclEntry> aclSpec = Arrays.asList(aclEntry(ACCESS, GROUP, BAR, 
FsAction.ALL));
-    intercept(AbfsAuthorizationException.class,
-        ()-> {
-          fs.setAcl(TEST_WRITE_THEN_READ_ONLY_PATH, aclSpec);
-    });
+    runTest(FileSystemOperations.SetAcl, true);
   }
 
   @Test
   public void testGetAclStatusAuthorized() throws Exception {
-    final AzureBlobFileSystem fs = getFileSystem();
-    assumeTrue("This test case only runs when namespace is enabled", 
fs.getIsNamespaceEnabled());
-    fs.create(TEST_WRITE_THEN_READ_ONLY_PATH).close();
-    List<AclEntry> aclSpec = Arrays.asList(aclEntry(ACCESS, GROUP, BAR, 
FsAction.ALL));
-    fs.getAclStatus(TEST_WRITE_THEN_READ_ONLY_PATH);
+    runTest(FileSystemOperations.GetAcl, false);
   }
 
   @Test
   public void testGetAclStatusUnauthorized() throws Exception {
+    runTest(FileSystemOperations.GetAcl, true);
+  }
+
+
+  private void runTest(FileSystemOperations testOp,
+      boolean expectAbfsAuthorizationException) throws Exception {
     final AzureBlobFileSystem fs = getFileSystem();
-    assumeTrue("This test case only runs when namespace is enabled", 
fs.getIsNamespaceEnabled());
-    fs.create(TEST_WRITE_ONLY_FILE_PATH_0).close();
-    List<AclEntry> aclSpec = Arrays.asList(aclEntry(ACCESS, GROUP, BAR, 
FsAction.ALL));
-    intercept(AbfsAuthorizationException.class,
-        ()-> {
-          fs.getAclStatus(TEST_WRITE_ONLY_FILE_PATH_0);
-    });
+
+    Path reqPath = new Path("requestPath"
+        + UUID.randomUUID().toString()
+        + (expectAbfsAuthorizationException ? "unauthorized":""));
+
+    getMockSASTokenProvider(fs).setSkipAuthorizationForTestSetup(true);
+    if ((testOp != FileSystemOperations.CreatePath)
+        && (testOp != FileSystemOperations.Mkdir)) {
+      fs.create(reqPath).close();
+      fs.getFileStatus(reqPath);
+    }
+    getMockSASTokenProvider(fs).setSkipAuthorizationForTestSetup(false);
+
+    // Test Operation
+    if (expectAbfsAuthorizationException) {
+      intercept(SASTokenProviderException.class, () -> {
+        executeOp(reqPath, fs, testOp);
+      });
+    } else {
+      executeOp(reqPath, fs, testOp);
+    }
+  }
+
+  private void executeOp(Path reqPath, AzureBlobFileSystem fs,
+      FileSystemOperations op) throws IOException, IOException {
+
+
+    switch (op) {
+    case ListPaths:
+      fs.listStatus(reqPath);
+      break;
+    case CreatePath:
+      fs.create(reqPath);
+      break;
+    case RenamePath:
+      fs.rename(reqPath,
+          new Path("renameDest" + UUID.randomUUID().toString()));
+      break;
+    case GetAcl:
+      fs.getAclStatus(reqPath);
+      break;
+    case GetPathStatus:
+      fs.getFileStatus(reqPath);
+      break;
+    case SetAcl:
+      fs.setAcl(reqPath, Arrays
+          .asList(aclEntry(ACCESS, GROUP, BAR, FsAction.ALL)));
+      break;
+    case SetOwner:
+      fs.setOwner(reqPath, TEST_USER, TEST_GROUP);
+      break;
+    case SetPermissions:
+      fs.setPermission(reqPath,
+          new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE));
+      break;
+    case Append:
+      fs.append(reqPath);
+      break;
+    case ReadFile:
+      fs.open(reqPath);
+      break;
+    case Open:
+      fs.open(reqPath);
+      break;
+    case DeletePath:
+      fs.delete(reqPath, false);
+      break;
+    case Mkdir:
+      fs.mkdirs(reqPath,
+          new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE));
+      break;
+    case RemoveAclEntries:
+      fs.removeAclEntries(reqPath, Arrays
+          .asList(aclEntry(ACCESS, GROUP, BAR, FsAction.ALL)));
+      break;
+    case ModifyAclEntries:
+      fs.modifyAclEntries(reqPath, Arrays
+          .asList(aclEntry(ACCESS, GROUP, BAR, FsAction.ALL)));
+      break;
+    case RemoveAcl:
+      fs.removeAcl(reqPath);
+      break;
+    case RemoveDefaultAcl:
+      fs.removeDefaultAcl(reqPath);
+      break;
+    default:
+      throw new IllegalStateException("Unexpected value: " + op);
+    }
+  }
+
+  private MockSASTokenProvider getMockSASTokenProvider(AzureBlobFileSystem fs)
+      throws Exception {
+    return ((MockSASTokenProvider) 
fs.getAbfsStore().getClient().getSasTokenProvider());
+  }
+
+  enum FileSystemOperations {
+    None, ListPaths, CreatePath, RenamePath, GetAcl, GetPathStatus, SetAcl,
+    SetOwner, SetPermissions, Append, ReadFile, DeletePath, Mkdir,
+    RemoveAclEntries, RemoveDefaultAcl, RemoveAcl, ModifyAclEntries,
+    Open
   }
 }
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/constants/TestConfigurationKeys.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/constants/TestConfigurationKeys.java
index 579e76f..c8dcef3 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/constants/TestConfigurationKeys.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/constants/TestConfigurationKeys.java
@@ -42,6 +42,9 @@ public final class TestConfigurationKeys {
 
   public static final String FS_AZURE_BLOB_FS_CHECKACCESS_TEST_USER_GUID = 
"fs.azure.check.access.testuser.guid";
 
+  public static final String MOCK_SASTOKENPROVIDER_FAIL_INIT = 
"mock.sastokenprovider.fail.init";
+  public static final String MOCK_SASTOKENPROVIDER_RETURN_EMPTY_SAS_TOKEN = 
"mock.sastokenprovider.return.empty.sasToken";
+
   public static final String TEST_CONFIGURATION_FILE_NAME = "azure-test.xml";
   public static final String TEST_CONTAINER_PREFIX = "abfs-testcontainer-";
   public static final int TEST_TIMEOUT = 15 * 60 * 1000;
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/MockAbfsAuthorizer.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/MockAbfsAuthorizer.java
deleted file mode 100644
index 6820edd..0000000
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/MockAbfsAuthorizer.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.azurebfs.extensions;
-
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsAction;
-
-/**
- * A mock Azure Blob File System Authorization Implementation
- */
-public class MockAbfsAuthorizer implements AbfsAuthorizer {
-
-  public static final String TEST_READ_ONLY_FILE_0 = "readOnlyFile0";
-  public static final String TEST_READ_ONLY_FILE_1 = "readOnlyFile1";
-  public static final String TEST_READ_ONLY_FOLDER = "readOnlyFolder";
-  public static final String TEST_WRITE_ONLY_FILE_0 = "writeOnlyFile0";
-  public static final String TEST_WRITE_ONLY_FILE_1 = "writeOnlyFile1";
-  public static final String TEST_WRITE_ONLY_FOLDER = "writeOnlyFolder";
-  public static final String TEST_READ_WRITE_FILE_0 = "readWriteFile0";
-  public static final String TEST_READ_WRITE_FILE_1 = "readWriteFile1";
-  public static final String TEST_WRITE_THEN_READ_ONLY = 
"writeThenReadOnlyFile";
-  private Configuration conf;
-  private Set<Path> readOnlyPaths = new HashSet<Path>();
-  private Set<Path> writeOnlyPaths = new HashSet<Path>();
-  private Set<Path> readWritePaths = new HashSet<Path>();
-  private int writeThenReadOnly = 0;
-  public MockAbfsAuthorizer(Configuration conf) {
-    this.conf = conf;
-  }
-
-  @Override
-  public void init() throws AbfsAuthorizationException, IOException {
-    readOnlyPaths.add(new Path(TEST_READ_ONLY_FILE_0));
-    readOnlyPaths.add(new Path(TEST_READ_ONLY_FILE_1));
-    readOnlyPaths.add(new Path(TEST_READ_ONLY_FOLDER));
-    writeOnlyPaths.add(new Path(TEST_WRITE_ONLY_FILE_0));
-    writeOnlyPaths.add(new Path(TEST_WRITE_ONLY_FILE_1));
-    writeOnlyPaths.add(new Path(TEST_WRITE_ONLY_FOLDER));
-    readWritePaths.add(new Path(TEST_READ_WRITE_FILE_0));
-    readWritePaths.add(new Path(TEST_READ_WRITE_FILE_1));
-  }
-
-  @Override
-  public boolean isAuthorized(FsAction action, Path... absolutePaths) throws 
AbfsAuthorizationException, IOException {
-    Set<Path> paths = new HashSet<Path>();
-    for (Path path : absolutePaths) {
-      paths.add(new Path(path.getName()));
-    }
-
-    if (action.equals(FsAction.READ) && Stream.concat(readOnlyPaths.stream(), 
readWritePaths.stream()).collect(Collectors.toSet()).containsAll(paths)) {
-      return true;
-    } else if (action.equals(FsAction.READ) && paths.contains(new 
Path(TEST_WRITE_THEN_READ_ONLY)) && writeThenReadOnly == 1) {
-      return true;
-    } else if (action.equals(FsAction.WRITE)
-        && Stream.concat(writeOnlyPaths.stream(), 
readWritePaths.stream()).collect(Collectors.toSet()).containsAll(paths)) {
-      return true;
-    } else if (action.equals(FsAction.WRITE) && paths.contains(new 
Path(TEST_WRITE_THEN_READ_ONLY)) && writeThenReadOnly == 0) {
-      writeThenReadOnly = 1;
-      return true;
-    } else {
-      return action.equals(FsAction.READ_WRITE) && 
readWritePaths.containsAll(paths);
-    }
-  }
-}
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/MockErrorSASTokenProvider.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/MockErrorSASTokenProvider.java
new file mode 100644
index 0000000..c334664
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/MockErrorSASTokenProvider.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.extensions;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+
+import static 
org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.MOCK_SASTOKENPROVIDER_FAIL_INIT;
+import static 
org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.MOCK_SASTOKENPROVIDER_RETURN_EMPTY_SAS_TOKEN;
+
+/**
+ * A mock SAS token provider to test error conditions.
+ */
+public class MockErrorSASTokenProvider implements SASTokenProvider {
+  Configuration config = null;
+
+  @Override
+  public void initialize(org.apache.hadoop.conf.Configuration configuration, 
String accountName) {
+    boolean throwExceptionAtInit = 
configuration.getBoolean(MOCK_SASTOKENPROVIDER_FAIL_INIT, false);
+    if (throwExceptionAtInit) {
+      throw new RuntimeException("MockSASTokenProvider initialize exception");
+    }
+
+    this.config = configuration;
+  }
+
+  /**
+   * Returns null SAS token query or Empty if returnEmptySASToken is set.
+   * @param accountName
+   * @param fileSystem the name of the fileSystem.
+   * @param path the file or directory path.
+   * @param operation the operation to be performed on the path.
+   * @return
+   */
+  @Override
+  public String getSASToken(String accountName, String fileSystem, String path,
+      String operation) {
+    boolean returnEmptySASTokenQuery = this.config.getBoolean(
+        MOCK_SASTOKENPROVIDER_RETURN_EMPTY_SAS_TOKEN, false);
+
+    if (returnEmptySASTokenQuery) {
+      return "";
+    } else { return null; }
+  }
+
+}
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/MockSASTokenProvider.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/MockSASTokenProvider.java
new file mode 100644
index 0000000..de841b0
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/MockSASTokenProvider.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.extensions;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.AccessControlException;
+
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.hadoop.fs.azurebfs.utils.Base64;
+import org.apache.hadoop.fs.azurebfs.utils.SASGenerator;
+
+/**
+ * A mock SAS token provider implementation
+ */
+public class MockSASTokenProvider implements SASTokenProvider {
+
+  private byte[] accountKey;
+  private SASGenerator generator;
+  private boolean skipAuthorizationForTestSetup = false;
+
+  // For testing we use a container SAS for all operations.
+  private String generateSAS(byte[] accountKey, String accountName, String 
fileSystemName) {
+     return generator.getContainerSASWithFullControl(accountName, 
fileSystemName);
+  }
+
+  @Override
+  public void initialize(Configuration configuration, String accountName) 
throws IOException {
+    try {
+      AbfsConfiguration abfsConfig = new AbfsConfiguration(configuration, 
accountName);
+      accountKey = Base64.decode(abfsConfig.getStorageAccountKey());
+    } catch (Exception ex) {
+      throw new IOException(ex);
+    }
+    generator = new SASGenerator(accountKey);
+  }
+
+  /**
+   * Invokes the authorizer to obtain a SAS token.
+   *
+   * @param accountName the name of the storage account.
+   * @param fileSystem the name of the fileSystem.
+   * @param path the file or directory path.
+   * @param operation the operation to be performed on the path.
+   * @return a SAS token to perform the request operation.
+   * @throws IOException if there is a network error.
+   * @throws AccessControlException if access is denied.
+   */
+  @Override
+  public String getSASToken(String accountName, String fileSystem, String path,
+                     String operation) throws IOException, 
AccessControlException {
+    if (!isSkipAuthorizationForTestSetup() && path.contains("unauthorized")) {
+      throw new AccessControlException(
+          "The user is not authorized to perform this operation.");
+    }
+
+    return generateSAS(accountKey, accountName, fileSystem);
+  }
+
+  public boolean isSkipAuthorizationForTestSetup() {
+    return skipAuthorizationForTestSetup;
+  }
+
+  public void setSkipAuthorizationForTestSetup(
+      boolean skipAuthorizationForTestSetup) {
+    this.skipAuthorizationForTestSetup = skipAuthorizationForTestSetup;
+  }
+}
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java
index 7df9fb1..deca8b3 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java
@@ -21,10 +21,12 @@ package org.apache.hadoop.fs.azurebfs.services;
 import java.net.URL;
 import java.util.regex.Pattern;
 
+import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
 import org.junit.Assert;
 import org.junit.Test;
 
 import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
 import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
@@ -36,14 +38,15 @@ import org.apache.hadoop.util.VersionInfo;
  */
 public final class TestAbfsClient {
 
-  private final String accountName = "bogusAccountName";
+  private final String accountName = "bogusAccountName.dfs.core.windows.net";
 
   private void validateUserAgent(String expectedPattern,
                                  URL baseUrl,
                                  AbfsConfiguration config,
-                                 boolean includeSSLProvider) {
+                                 boolean includeSSLProvider)
+      throws AzureBlobFileSystemException {
     AbfsClient client = new AbfsClient(baseUrl, null,
-        config, null, null, null);
+        config, null, (AccessTokenProvider) null, null);
     String sslProviderName = null;
     if (includeSSLProvider) {
       sslProviderName = 
DelegatingSSLSocketFactory.getDefaultFactory().getProviderName();
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/SASGenerator.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/SASGenerator.java
new file mode 100644
index 0000000..19bf9e2
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/SASGenerator.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.utils;
+
+import java.io.UnsupportedEncodingException;
+import java.time.format.DateTimeFormatter;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.Locale;
+import javax.crypto.Mac;
+import javax.crypto.spec.SecretKeySpec;
+
+import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
+import org.apache.hadoop.fs.azurebfs.services.AbfsUriQueryBuilder;
+
+/**
+ * Test container SAS generator.
+ */
+public class SASGenerator {
+
+  private static final String HMAC_SHA256 = "HmacSHA256";
+  private static final int TOKEN_START_PERIOD_IN_SECONDS = 5 * 60;
+  private static final int TOKEN_EXPIRY_PERIOD_IN_SECONDS = 24 * 60 * 60;
+  public static final DateTimeFormatter ISO_8601_UTC_DATE_FORMATTER =
+      DateTimeFormatter
+          .ofPattern("yyyy-MM-dd'T'HH:mm:ss'Z'", Locale.ROOT)
+          .withZone(ZoneId.of("UTC"));
+  private Mac hmacSha256;
+  private byte[] key;
+
+  public SASGenerator(byte[] key) {
+    this.key = key;
+    initializeMac();
+  }
+
+  public String getContainerSASWithFullControl(String accountName, String 
containerName) {
+    String sp = "rcwdl";
+    String sv = "2018-11-09";
+    String sr = "c";
+    String st = 
ISO_8601_UTC_DATE_FORMATTER.format(Instant.now().minusSeconds(TOKEN_START_PERIOD_IN_SECONDS));
+    String se =
+        
ISO_8601_UTC_DATE_FORMATTER.format(Instant.now().plusSeconds(TOKEN_EXPIRY_PERIOD_IN_SECONDS));
+
+    String signature = computeSignatureForSAS(sp, st, se, sv, "c",
+        accountName, containerName);
+
+    AbfsUriQueryBuilder qb = new AbfsUriQueryBuilder();
+    qb.addQuery("sp", sp);
+    qb.addQuery("st", st);
+    qb.addQuery("se", se);
+    qb.addQuery("sv", sv);
+    qb.addQuery("sr", sr);
+    qb.addQuery("sig", signature);
+    return qb.toString().substring(1);
+  }
+
+  private String computeSignatureForSAS(String sp, String st,
+      String se, String sv, String sr, String accountName, String 
containerName) {
+
+    StringBuilder sb = new StringBuilder();
+    sb.append(sp);
+    sb.append("\n");
+    sb.append(st);
+    sb.append("\n");
+    sb.append(se);
+    sb.append("\n");
+    // canonicalized resource
+    sb.append("/blob/");
+    sb.append(accountName);
+    sb.append("/");
+    sb.append(containerName);
+    sb.append("\n");
+    sb.append("\n"); // si
+    sb.append("\n"); // sip
+    sb.append("\n"); // spr
+    sb.append(sv);
+    sb.append("\n");
+    sb.append(sr);
+    sb.append("\n");
+    sb.append("\n"); // - For optional : rscc - ResponseCacheControl
+    sb.append("\n"); // - For optional : rscd - ResponseContentDisposition
+    sb.append("\n"); // - For optional : rsce - ResponseContentEncoding
+    sb.append("\n"); // - For optional : rscl - ResponseContentLanguage
+    sb.append("\n"); // - For optional : rsct - ResponseContentType
+
+    String stringToSign = sb.toString();
+    return computeHmac256(stringToSign);
+  }
+
+  private void initializeMac() {
+    // Initializes the HMAC-SHA256 Mac and SecretKey.
+    try {
+      hmacSha256 = Mac.getInstance(HMAC_SHA256);
+      hmacSha256.init(new SecretKeySpec(key, HMAC_SHA256));
+    } catch (final Exception e) {
+      throw new IllegalArgumentException(e);
+    }
+  }
+
+  private String computeHmac256(final String stringToSign) {
+    byte[] utf8Bytes;
+    try {
+      utf8Bytes = stringToSign.getBytes(AbfsHttpConstants.UTF_8);
+    } catch (final UnsupportedEncodingException e) {
+      throw new IllegalArgumentException(e);
+    }
+    byte[] hmac;
+    synchronized (this) {
+      hmac = hmacSha256.doFinal(utf8Bytes);
+    }
+    return Base64.encode(hmac);
+  }
+}
diff --git a/hadoop-tools/hadoop-azure/src/test/resources/azure-test.xml 
b/hadoop-tools/hadoop-azure/src/test/resources/azure-test.xml
index d833cfb..d43d67e 100644
--- a/hadoop-tools/hadoop-azure/src/test/resources/azure-test.xml
+++ b/hadoop-tools/hadoop-azure/src/test/resources/azure-test.xml
@@ -30,7 +30,7 @@
 
   <property>
     <name>fs.azure.test.namespace.enabled</name>
-    <value>false</value>
+    <value>true</value>
   </property>
 
   <property>


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to