This is an automated email from the ASF dual-hosted git repository.

anujmodi pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9fb695869e4 HADOOP-19736: ABFS. Support for new auth type: User-bound 
SAS (#8051)
9fb695869e4 is described below

commit 9fb695869e4775091ff38ef939ca0a7e7c508c23
Author: manika137 <[email protected]>
AuthorDate: Thu Nov 27 19:14:46 2025 -0800

    HADOOP-19736: ABFS. Support for new auth type: User-bound SAS (#8051)
    
    Contributed by Manika Joshi
---
 .../hadoop/fs/azurebfs/AbfsConfiguration.java      |  77 +++-
 .../hadoop/fs/azurebfs/AzureBlobFileSystem.java    |  24 ++
 .../fs/azurebfs/AzureBlobFileSystemStore.java      |  26 +-
 .../fs/azurebfs/constants/AbfsHttpConstants.java   |   4 +-
 .../fs/azurebfs/services/AbfsBlobClient.java       |  14 +-
 .../hadoop/fs/azurebfs/services/AbfsClient.java    |  29 +-
 .../fs/azurebfs/services/AbfsClientHandler.java    |  70 ++--
 .../hadoop/fs/azurebfs/services/AbfsDfsClient.java |  27 +-
 .../hadoop/fs/azurebfs/services/AbfsErrors.java    |   2 +
 .../fs/azurebfs/services/AbfsRestOperation.java    |   5 +
 .../hadoop/fs/azurebfs/services/AuthType.java      |   3 +-
 .../hadoop-azure/src/site/markdown/index.md        |  67 +++-
 .../fs/azurebfs/AbstractAbfsIntegrationTest.java   |  19 +
 .../ITestAzureBlobFileSystemUserBoundSAS.java      | 414 +++++++++++++++++++++
 .../azurebfs/constants/TestConfigurationKeys.java  |   3 +
 .../extensions/MockDelegationSASTokenProvider.java |   8 +-
 .../extensions/MockInvalidSASTokenProvider.java    |  48 +++
 ...der.java => MockUserBoundSASTokenProvider.java} | 113 ++++--
 .../fs/azurebfs/services/ITestAbfsClient.java      | 133 +++++--
 .../services/ITestAbfsTailLatencyTracker.java      |   4 +-
 .../fs/azurebfs/services/TestAbfsClient.java       |   2 +
 .../fs/azurebfs/services/TestAbfsInputStream.java  |   3 +-
 ...ava => DelegationSASGeneratorVersionJuly5.java} |  92 ++++-
 .../hadoop/fs/azurebfs/utils/SASGenerator.java     |   3 +-
 24 files changed, 1009 insertions(+), 181 deletions(-)

diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
index a54ee2bd444..e4c88b208b9 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
@@ -1240,9 +1240,9 @@ public int getNumLeaseThreads() {
   }
 
   public boolean getCreateRemoteFileSystemDuringInitialization() {
-    // we do not support creating the filesystem when AuthType is SAS
+    // we do not support creating the filesystem when AuthType is SAS or 
UserboundSASWithOAuth
     return this.createRemoteFileSystemDuringInitialization
-        && this.getAuthType(this.accountName) != AuthType.SAS;
+        && !(validateForSASType(this.getAuthType(this.accountName)));
   }
 
   public boolean getSkipUserGroupMetadataDuringInitialization() {
@@ -1413,9 +1413,14 @@ public boolean shouldTrackLatency() {
     return this.trackLatency;
   }
 
+  public boolean validateForSASType(AuthType authType){
+    return authType == AuthType.SAS
+        || authType == AuthType.UserboundSASWithOAuth;
+  }
+
   public AccessTokenProvider getTokenProvider() throws 
TokenAccessProviderException {
     AuthType authType = getEnum(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME, 
AuthType.SharedKey);
-    if (authType == AuthType.OAuth) {
+    if (authType == AuthType.OAuth || authType == 
AuthType.UserboundSASWithOAuth) {
       try {
         Class<? extends AccessTokenProvider> tokenProviderClass =
             getTokenProviderClass(authType,
@@ -1563,7 +1568,7 @@ public AccessTokenProvider getTokenProvider() throws 
TokenAccessProviderExceptio
    * the AbfsConfiguration with which a filesystem is initialized, and 
eliminate
    * chances of dynamic modifications and spurious situations.<br>
    * @return sasTokenProvider object based on configurations provided
-   * @throws AzureBlobFileSystemException
+   * @throws AzureBlobFileSystemException if SAS token provider initialization 
fails
    */
   public SASTokenProvider getSASTokenProvider() throws 
AzureBlobFileSystemException {
     AuthType authType = getEnum(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME, 
AuthType.SharedKey);
@@ -1610,6 +1615,70 @@ public SASTokenProvider getSASTokenProvider() throws 
AzureBlobFileSystemExceptio
     }
   }
 
+  /**
+   * Returns the SASTokenProvider implementation to be used to generate 
user-bound SAS token.
+   * Custom implementation of {@link SASTokenProvider} under th config
+   * "fs.azure.sas.token.provider.type" needs to be provided.
+   * @param authType authentication type
+   * @return sasTokenProvider object based on configurations provided
+   * @throws AzureBlobFileSystemException is user-bound SAS token provider 
initialization fails
+   */
+  public SASTokenProvider getUserBoundSASTokenProvider(AuthType authType)
+      throws AzureBlobFileSystemException {
+
+    try {
+      Class<? extends SASTokenProvider> customSasTokenProviderImplementation =
+          getTokenProviderClass(authType, FS_AZURE_SAS_TOKEN_PROVIDER_TYPE,
+              null, SASTokenProvider.class);
+
+      if (customSasTokenProviderImplementation == null) {
+        throw new SASTokenProviderException(String.format(
+            "\"%s\" must be set for user-bound SAS auth type.",
+            FS_AZURE_SAS_TOKEN_PROVIDER_TYPE));
+      }
+
+        SASTokenProvider sasTokenProvider = ReflectionUtils.newInstance(
+            customSasTokenProviderImplementation, rawConfig);
+        if (sasTokenProvider == null) {
+          throw new SASTokenProviderException(String.format(
+              "Failed to initialize %s", 
customSasTokenProviderImplementation));
+        }
+        LOG.trace("Initializing {}", 
customSasTokenProviderImplementation.getName());
+        sasTokenProvider.initialize(rawConfig, accountName);
+        LOG.trace("{} init complete", 
customSasTokenProviderImplementation.getName());
+        return sasTokenProvider;
+    } catch (SASTokenProviderException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new SASTokenProviderException(
+          "Unable to load user-bound SAS token provider class: " + e, e);
+    }
+  }
+
+  /**
+   * Returns both the AccessTokenProvider and the SASTokenProvider
+   * when auth type is UserboundSASWithOAuth.
+   *
+   * @return Object[] where:
+   *   [0] = AccessTokenProvider
+   *   [1] = SASTokenProvider
+   * @throws AzureBlobFileSystemException if provider initialization fails
+   */
+  public Object[] getUserBoundSASBothTokenProviders()
+      throws AzureBlobFileSystemException {
+    AuthType authType = getEnum(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME,
+        AuthType.SharedKey);
+    if (authType != AuthType.UserboundSASWithOAuth) {
+      throw new SASTokenProviderException(String.format(
+          "Invalid auth type: %s is being used, expecting user-bound SAS.",
+          authType));
+    }
+
+    AccessTokenProvider tokenProvider = getTokenProvider();
+    SASTokenProvider sasTokenProvider = getUserBoundSASTokenProvider(authType);
+    return new Object[]{tokenProvider, sasTokenProvider};
+  }
+
   public EncryptionContextProvider createEncryptionContextProvider() {
     try {
       String configKey = FS_AZURE_ENCRYPTION_CONTEXT_PROVIDER_TYPE;
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
index 2687bc70835..572bc873b1c 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
@@ -133,6 +133,7 @@
 import static 
org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_CREATE_ON_ROOT;
 import static 
org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_INVALID_ABFS_STATE;
 import static 
org.apache.hadoop.fs.azurebfs.services.AbfsErrors.UNAUTHORIZED_SAS;
+import static 
org.apache.hadoop.fs.azurebfs.services.AbfsErrors.UNAUTHORIZED_USER_BOUND_SAS;
 import static 
org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
 import static 
org.apache.hadoop.fs.statistics.IOStatisticsLogging.logIOStatisticsAtLevel;
 import static 
org.apache.hadoop.util.functional.RemoteIterators.filteringRemoteIterator;
@@ -271,6 +272,29 @@ public void initialize(URI uri, Configuration 
configuration)
       throw new 
InvalidConfigurationValueException(FS_AZURE_ACCOUNT_IS_HNS_ENABLED, ex);
     }
 
+    /**
+     * Validates that User-bound SAS with OAuth is not used for FNS 
(non-hierarchical namespace) accounts.
+     * Throws an InvalidConfigurationValueException if this configuration is 
detected.
+     *
+     * @throws InvalidConfigurationValueException if User-bound SAS with OAuth 
is configured for FNS accounts
+     */
+    try {
+      if (abfsConfiguration.getAuthType(abfsConfiguration.getAccountName())
+          == AuthType.UserboundSASWithOAuth && // Auth type is User-bound SAS
+          !tryGetIsNamespaceEnabled(
+              new TracingContext(initFSTracingContext))) { // Account is FNS
+        throw new 
InvalidConfigurationValueException(UNAUTHORIZED_USER_BOUND_SAS);
+      }
+    } catch (InvalidConfigurationValueException ex) {
+      LOG.error("User-bound SAS not supported for FNS Accounts", ex);
+      throw ex;
+    } catch (AzureBlobFileSystemException ex) {
+      LOG.error("Failed to determine account type for auth type validation",
+          ex);
+      throw new InvalidConfigurationValueException(
+          FS_AZURE_ACCOUNT_IS_HNS_ENABLED, ex);
+    }
+
     /*
      * Non-hierarchical-namespace account can not have a 
customer-provided-key(CPK).
      * Fail initialization of filesystem if the configs are provided. CPK is of
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
index 0cec6a7e87f..d3f1ecd7cee 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
@@ -1754,11 +1754,20 @@ private void initializeClient(URI uri, String 
fileSystemName,
     } else if (authType == AuthType.SAS) {
       LOG.trace("Fetching SAS Token Provider");
       sasTokenProvider = abfsConfiguration.getSASTokenProvider();
+    } else if (authType == AuthType.UserboundSASWithOAuth) {
+      LOG.trace("Fetching SAS and OAuth Token Provider for user bound SAS");
+      AzureADAuthenticator.init(abfsConfiguration);
+      Object[] providers
+          = abfsConfiguration.getUserBoundSASBothTokenProviders();
+      tokenProvider = (AccessTokenProvider) providers[0];
+      sasTokenProvider = (SASTokenProvider) providers[1];
+      ExtensionHelper.bind(tokenProvider, uri,
+          abfsConfiguration.getRawConfiguration());
     } else {
       LOG.trace("Fetching token provider");
       tokenProvider = abfsConfiguration.getTokenProvider();
       ExtensionHelper.bind(tokenProvider, uri,
-            abfsConfiguration.getRawConfiguration());
+          abfsConfiguration.getRawConfiguration());
     }
 
     // Encryption setup
@@ -1782,16 +1791,11 @@ private void initializeClient(URI uri, String 
fileSystemName,
       }
     }
 
-    LOG.trace("Initializing AbfsClient for {}", baseUrl);
-    if (tokenProvider != null) {
-      this.clientHandler = new AbfsClientHandler(baseUrl, creds, 
abfsConfiguration,
-          tokenProvider, encryptionContextProvider,
-          populateAbfsClientContext());
-    } else {
-      this.clientHandler = new AbfsClientHandler(baseUrl, creds, 
abfsConfiguration,
-          sasTokenProvider, encryptionContextProvider,
-          populateAbfsClientContext());
-    }
+    LOG.trace("Initializing AbfsClientHandler for {}", baseUrl);
+    this.clientHandler = new AbfsClientHandler(baseUrl, creds,
+        abfsConfiguration,
+        tokenProvider, sasTokenProvider, encryptionContextProvider,
+        populateAbfsClientContext());
 
     this.setClient(getClientHandler().getClient());
     LOG.trace("AbfsClient init complete");
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java
index a751101cf57..5b3b5ad3ac2 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java
@@ -147,6 +147,7 @@ public final class AbfsHttpConstants {
   public static final String APPLICATION_JSON = "application/json";
   public static final String APPLICATION_OCTET_STREAM = 
"application/octet-stream";
   public static final String APPLICATION_XML = "application/xml";
+  public static final String APPLICATION_X_WWW_FORM_URLENCODED = 
"application/x-www-form-urlencoded";
   public static final String XMS_PROPERTIES_ENCODING_ASCII = "ISO-8859-1";
   public static final String XMS_PROPERTIES_ENCODING_UNICODE = "UTF-8";
 
@@ -189,7 +190,8 @@ public enum ApiVersion {
     DEC_12_2019("2019-12-12"),
     APR_10_2021("2021-04-10"),
     AUG_03_2023("2023-08-03"),
-    NOV_04_2024("2024-11-04");
+    NOV_04_2024("2024-11-04"),
+    JUL_05_2025("2025-07-05");
 
     private final String xMsApiVersion;
 
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java
index 52ec53e4f13..0210dd543d6 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java
@@ -186,22 +186,10 @@ public AbfsBlobClient(final URL baseUrl,
       final SharedKeyCredentials sharedKeyCredentials,
       final AbfsConfiguration abfsConfiguration,
       final AccessTokenProvider tokenProvider,
-      final EncryptionContextProvider encryptionContextProvider,
-      final AbfsClientContext abfsClientContext) throws IOException {
-    super(baseUrl, sharedKeyCredentials, abfsConfiguration, tokenProvider,
-        encryptionContextProvider, abfsClientContext, AbfsServiceType.BLOB);
-    this.azureAtomicRenameDirSet = new HashSet<>(Arrays.asList(
-        abfsConfiguration.getAzureAtomicRenameDirs()
-            .split(AbfsHttpConstants.COMMA)));
-  }
-
-  public AbfsBlobClient(final URL baseUrl,
-      final SharedKeyCredentials sharedKeyCredentials,
-      final AbfsConfiguration abfsConfiguration,
       final SASTokenProvider sasTokenProvider,
       final EncryptionContextProvider encryptionContextProvider,
       final AbfsClientContext abfsClientContext) throws IOException {
-    super(baseUrl, sharedKeyCredentials, abfsConfiguration, sasTokenProvider,
+    super(baseUrl, sharedKeyCredentials, abfsConfiguration, tokenProvider, 
sasTokenProvider,
         encryptionContextProvider, abfsClientContext, AbfsServiceType.BLOB);
     this.azureAtomicRenameDirSet = new HashSet<>(Arrays.asList(
         abfsConfiguration.getAzureAtomicRenameDirs()
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
index 4d9ceee9e3f..74bddff682a 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
@@ -344,22 +344,24 @@ private AbfsClient(final URL baseUrl,
     LOG.trace("primaryUserGroup is {}", this.primaryUserGroup);
   }
 
-  public AbfsClient(final URL baseUrl,
-      final SharedKeyCredentials sharedKeyCredentials,
-      final AbfsConfiguration abfsConfiguration,
-      final AccessTokenProvider tokenProvider,
-      final EncryptionContextProvider encryptionContextProvider,
-      final AbfsClientContext abfsClientContext,
-      final AbfsServiceType abfsServiceType)
-      throws IOException {
-    this(baseUrl, sharedKeyCredentials, abfsConfiguration,
-        encryptionContextProvider, abfsClientContext, abfsServiceType);
-    this.tokenProvider = tokenProvider;
-  }
 
+  /**
+   * Constructs an AbfsClient instance with all authentication and 
configuration options.
+   *
+   * @param baseUrl The base URL for the ABFS endpoint.
+   * @param sharedKeyCredentials Shared key credentials for authentication.
+   * @param abfsConfiguration The ABFS configuration.
+   * @param tokenProvider The access token provider for OAuth authentication.
+   * @param sasTokenProvider The SAS token provider for SAS authentication.
+   * @param encryptionContextProvider The encryption context provider.
+   * @param abfsClientContext The client context
+   * @param abfsServiceType The ABFS service type (e.g., Blob, DFS).
+   * @throws IOException if initialization fails.
+   */
   public AbfsClient(final URL baseUrl,
       final SharedKeyCredentials sharedKeyCredentials,
       final AbfsConfiguration abfsConfiguration,
+      final AccessTokenProvider tokenProvider,
       final SASTokenProvider sasTokenProvider,
       final EncryptionContextProvider encryptionContextProvider,
       final AbfsClientContext abfsClientContext,
@@ -368,6 +370,7 @@ public AbfsClient(final URL baseUrl,
     this(baseUrl, sharedKeyCredentials, abfsConfiguration,
         encryptionContextProvider, abfsClientContext, abfsServiceType);
     this.sasTokenProvider = sasTokenProvider;
+    this.tokenProvider = tokenProvider;
   }
 
   @Override
@@ -1179,7 +1182,7 @@ protected String appendSASTokenToQuery(String path,
                                          String cachedSasToken)
       throws SASTokenProviderException {
     String sasToken = null;
-    if (this.authType == AuthType.SAS) {
+    if (getAbfsConfiguration().validateForSASType(this.authType)) {
       try {
         LOG.trace("Fetch SAS token for {} on {}", operation, path);
         if (cachedSasToken == null) {
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientHandler.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientHandler.java
index a7bf5699dc2..393811c256b 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientHandler.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientHandler.java
@@ -47,24 +47,26 @@ public class AbfsClientHandler implements Closeable {
   private final AbfsDfsClient dfsAbfsClient;
   private final AbfsBlobClient blobAbfsClient;
 
-  public AbfsClientHandler(final URL baseUrl,
-      final SharedKeyCredentials sharedKeyCredentials,
-      final AbfsConfiguration abfsConfiguration,
-      final AccessTokenProvider tokenProvider,
-      final EncryptionContextProvider encryptionContextProvider,
-      final AbfsClientContext abfsClientContext) throws IOException {
-    this.dfsAbfsClient = createDfsClient(baseUrl, sharedKeyCredentials,
-        abfsConfiguration, tokenProvider, null, encryptionContextProvider,
-        abfsClientContext);
-    this.blobAbfsClient = createBlobClient(baseUrl, sharedKeyCredentials,
-        abfsConfiguration, tokenProvider, null, encryptionContextProvider,
-        abfsClientContext);
-    initServiceType(abfsConfiguration);
-  }
 
+  /**
+   * Constructs an AbfsClientHandler instance.
+   *
+   * Initializes the default and ingress service types from the provided 
configuration,
+   * then creates both DFS and Blob clients using the given params
+   *
+   * @param baseUrl the base URL for the file system.
+   * @param sharedKeyCredentials credentials for shared key authentication.
+   * @param abfsConfiguration the ABFS configuration.
+   * @param tokenProvider the access token provider, may be null.
+   * @param sasTokenProvider the SAS token provider, may be null.
+   * @param encryptionContextProvider the encryption context provider
+   * @param abfsClientContext the ABFS client context.
+   * @throws IOException if client creation or URL conversion fails.
+   */
   public AbfsClientHandler(final URL baseUrl,
       final SharedKeyCredentials sharedKeyCredentials,
       final AbfsConfiguration abfsConfiguration,
+      final AccessTokenProvider tokenProvider,
       final SASTokenProvider sasTokenProvider,
       final EncryptionContextProvider encryptionContextProvider,
       final AbfsClientContext abfsClientContext) throws IOException {
@@ -73,10 +75,10 @@ public AbfsClientHandler(final URL baseUrl,
     // only for default client.
     initServiceType(abfsConfiguration);
     this.dfsAbfsClient = createDfsClient(baseUrl, sharedKeyCredentials,
-        abfsConfiguration, null, sasTokenProvider, encryptionContextProvider,
+        abfsConfiguration, tokenProvider, sasTokenProvider, 
encryptionContextProvider,
         abfsClientContext);
     this.blobAbfsClient = createBlobClient(baseUrl, sharedKeyCredentials,
-        abfsConfiguration, null, sasTokenProvider, encryptionContextProvider,
+        abfsConfiguration, tokenProvider, sasTokenProvider, 
encryptionContextProvider,
         abfsClientContext);
   }
 
@@ -154,17 +156,13 @@ private AbfsDfsClient createDfsClient(final URL baseUrl,
       final EncryptionContextProvider encryptionContextProvider,
       final AbfsClientContext abfsClientContext) throws IOException {
     URL dfsUrl = changeUrlFromBlobToDfs(baseUrl);
-    if (tokenProvider != null) {
-      LOG.debug("Creating AbfsDfsClient with access token provider using the 
URL: {}", dfsUrl);
-      return new AbfsDfsClient(dfsUrl, creds, abfsConfiguration,
-          tokenProvider, encryptionContextProvider,
-          abfsClientContext);
-    } else {
-      LOG.debug("Creating AbfsDfsClient with SAS token provider using the URL: 
{}", dfsUrl);
-      return new AbfsDfsClient(dfsUrl, creds, abfsConfiguration,
-          sasTokenProvider, encryptionContextProvider,
-          abfsClientContext);
-    }
+    LOG.debug(
+        "Creating AbfsDfsClient with access token provider: %s and "
+            + "SAS token provider: %s using the URL: %s",
+        tokenProvider, sasTokenProvider, dfsUrl);
+    return new AbfsDfsClient(dfsUrl, creds, abfsConfiguration,
+        tokenProvider, sasTokenProvider, encryptionContextProvider,
+        abfsClientContext);
   }
 
   /**
@@ -188,17 +186,13 @@ private AbfsBlobClient createBlobClient(final URL baseUrl,
       final EncryptionContextProvider encryptionContextProvider,
       final AbfsClientContext abfsClientContext) throws IOException {
     URL blobUrl = changeUrlFromDfsToBlob(baseUrl);
-    if (tokenProvider != null) {
-      LOG.debug("Creating AbfsBlobClient with access token provider using the 
URL: {}", blobUrl);
-      return new AbfsBlobClient(blobUrl, creds, abfsConfiguration,
-          tokenProvider, encryptionContextProvider,
-          abfsClientContext);
-    } else {
-      LOG.debug("Creating AbfsBlobClient with SAS token provider using the 
URL: {}", blobUrl);
-      return new AbfsBlobClient(blobUrl, creds, abfsConfiguration,
-          sasTokenProvider, encryptionContextProvider,
-          abfsClientContext);
-    }
+    LOG.debug(
+        "Creating AbfsBlobClient with access token provider: %s and "
+            + "SAS token provider: %s using the URL: %s",
+        tokenProvider, sasTokenProvider, blobUrl);
+    return new AbfsBlobClient(blobUrl, creds, abfsConfiguration,
+        tokenProvider, sasTokenProvider, encryptionContextProvider,
+        abfsClientContext);
   }
 
   @Override
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java
index 08ef93d1008..3a9244c2f82 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java
@@ -160,27 +160,7 @@ public class AbfsDfsClient extends AbfsClient {
    * @param baseUrl the base URL of the DFS endpoint
    * @param sharedKeyCredentials the shared key credentials
    * @param abfsConfiguration the ABFS configuration
-   * @param tokenProvider the access token provider for authentication
-   * @param encryptionContextProvider the encryption context provider
-   * @param abfsClientContext the ABFS client context
-   * @throws IOException if client initialization fails
-   */
-  public AbfsDfsClient(final URL baseUrl,
-      final SharedKeyCredentials sharedKeyCredentials,
-      final AbfsConfiguration abfsConfiguration,
-      final AccessTokenProvider tokenProvider,
-      final EncryptionContextProvider encryptionContextProvider,
-      final AbfsClientContext abfsClientContext) throws IOException {
-    super(baseUrl, sharedKeyCredentials, abfsConfiguration, tokenProvider,
-        encryptionContextProvider, abfsClientContext, AbfsServiceType.DFS);
-  }
-
-  /**
-   * Creates an {@code AbfsDfsClient} instance.
-   *
-   * @param baseUrl the base URL of the DFS endpoint
-   * @param sharedKeyCredentials the shared key credentials
-   * @param abfsConfiguration the ABFS configuration
+   * @param tokenProvider the OAuth access token provider
    * @param sasTokenProvider the SAS token provider
    * @param encryptionContextProvider the encryption context provider
    * @param abfsClientContext the ABFS client context
@@ -189,10 +169,11 @@ public AbfsDfsClient(final URL baseUrl,
   public AbfsDfsClient(final URL baseUrl,
       final SharedKeyCredentials sharedKeyCredentials,
       final AbfsConfiguration abfsConfiguration,
+      final AccessTokenProvider tokenProvider,
       final SASTokenProvider sasTokenProvider,
       final EncryptionContextProvider encryptionContextProvider,
       final AbfsClientContext abfsClientContext) throws IOException {
-    super(baseUrl, sharedKeyCredentials, abfsConfiguration, sasTokenProvider,
+    super(baseUrl, sharedKeyCredentials, abfsConfiguration, tokenProvider, 
sasTokenProvider,
         encryptionContextProvider, abfsClientContext, AbfsServiceType.DFS);
   }
 
@@ -1781,7 +1762,7 @@ private List<AbfsHttpHeader> getHeadersForRename(final 
String source)
     String encodedRenameSource = urlEncode(
         FORWARD_SLASH + this.getFileSystem() + source);
 
-    if (getAuthType() == AuthType.SAS) {
+    if (getAbfsConfiguration().validateForSASType(getAuthType())) {
       final AbfsUriQueryBuilder srcQueryBuilder = new AbfsUriQueryBuilder();
       appendSASTokenToQuery(source,
           SASTokenProvider.RENAME_SOURCE_OPERATION, srcQueryBuilder);
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsErrors.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsErrors.java
index 9b1fd394cba..fe7f3b5cb1b 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsErrors.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsErrors.java
@@ -64,6 +64,8 @@ public final class AbfsErrors {
    */
   public static final String UNAUTHORIZED_SAS
       = "Incorrect SAS token provider configured for non-hierarchical 
namespace account with DFS service type.";
+  public static final String UNAUTHORIZED_USER_BOUND_SAS
+      = "User bound SAS auth type is not supported on non-hierarchical 
namespace accounts.";
   public static final String ERR_RENAME_BLOB =
       "FNS-Blob rename was not successful for source and destination path: ";
   public static final String ERR_DELETE_BLOB =
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
index c6b331af80d..ff7300d280f 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
@@ -595,6 +595,11 @@ public void signRequest(final AbfsHttpOperation 
httpOperation, int bytesToSign)
         // do nothing; the SAS token should already be appended to the query 
string
         httpOperation.setMaskForSAS(); //mask sig/oid from url for logs
         break;
+      case UserboundSASWithOAuth:
+        
httpOperation.setRequestProperty(HttpHeaderConfigurations.AUTHORIZATION,
+            client.getAccessToken());
+        httpOperation.setMaskForSAS(); //mask sig/oid from url for logs
+        break;
       case SharedKey:
       default:
         // sign the HTTP request
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AuthType.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AuthType.java
index 03ffece350e..cda8f686453 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AuthType.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AuthType.java
@@ -24,5 +24,6 @@ public enum AuthType {
     SharedKey,
     OAuth,
     Custom,
-    SAS
+    SAS,
+    UserboundSASWithOAuth
 }
diff --git a/hadoop-tools/hadoop-azure/src/site/markdown/index.md 
b/hadoop-tools/hadoop-azure/src/site/markdown/index.md
index e52555ef76f..bed15b8ff56 100644
--- a/hadoop-tools/hadoop-azure/src/site/markdown/index.md
+++ b/hadoop-tools/hadoop-azure/src/site/markdown/index.md
@@ -303,8 +303,10 @@ driven by them.
 3. Deployed in-Azure with the Azure VMs providing OAuth 2.0 tokens to the 
application, "Managed Instance".
 4. Using Shared Access Signature (SAS) tokens provided by a custom 
implementation of the SASTokenProvider interface.
 5. By directly configuring a fixed Shared Access Signature (SAS) token in the 
account configuration settings files.
+6. Using user-bound SAS auth type, which requires both OAuth 2.0 setup (point 
2 above) and SAS setup (point 4 above)
 
-Note: SAS Based Authentication should be used only with HNS Enabled accounts.
+Note: While user-bound SAS Authentication is **only supported** with HNS 
Enabled accounts, we **recommend** using HNS Enabled
+accounts with SAS authentication as well.
 
 What can be changed is what secrets/credentials are used to authenticate the 
caller.
 
@@ -370,7 +372,21 @@ To retrieve using shell script, specify the path to the 
script for the config
 `fs.azure.shellkeyprovider.script`. ShellDecryptionKeyProvider class use the
 script specified to retrieve the key.
 
-### <a name="oauth-client-credentials"></a> OAuth 2.0 Client Credentials
+### <a name="oauth-authentication"></a> OAuth 2.0 Authentication
+The below are the main options of identity configurations for OAuth settings.
+All of these would have OAuth set as the auth type
+
+```xml
+<property>
+  <name>fs.azure.account.auth.type</name>
+  <value>OAuth</value>
+  <description>
+    Use OAuth authentication
+  </description>
+</property>
+```
+
+#### <a name="oauth-client-credentials"></a> Client Credentials
 
 OAuth 2.0 credentials of (client id, client secret, endpoint) are provided in 
the configuration/JCEKS file.
 
@@ -416,7 +432,7 @@ the key names are slightly different here.
 </property>
 ```
 
-### <a name="oauth-user-and-passwd"></a> OAuth 2.0: Username and Password
+#### <a name="oauth-user-and-passwd"></a> Username and Password
 
 An OAuth 2.0 endpoint, username and password are provided in the 
configuration/JCEKS file.
 
@@ -458,7 +474,7 @@ An OAuth 2.0 endpoint, username and password are provided 
in the configuration/J
 </property>
 ```
 
-### <a name="oauth-refresh-token"></a> OAuth 2.0: Refresh Token
+#### <a name="oauth-refresh-token"></a> Refresh Token
 
 With an existing Oauth 2.0 token, make a request to the Active Directory 
endpoint
 `https://login.microsoftonline.com/Common/oauth2/token` for this token to be 
refreshed.
@@ -501,7 +517,7 @@ With an existing Oauth 2.0 token, make a request to the 
Active Directory endpoin
 </property>
 ```
 
-### <a name="managed-identity"></a> Azure Managed Identity
+#### <a name="managed-identity"></a> Azure Managed Identity
 
 [Azure Managed 
Identities](https://docs.microsoft.com/en-us/azure/active-directory/managed-identities-azure-resources/overview),
 formerly "Managed Service Identities".
 
@@ -549,7 +565,7 @@ The Azure Portal/CLI is used to create the service identity.
 </property>
 ```
 
-### <a name="workload-identity"></a> Azure Workload Identity
+#### <a name="workload-identity"></a> Azure Workload Identity
 
 [Azure Workload 
Identities](https://docs.microsoft.com/en-us/azure/active-directory/managed-identities-azure-resources/overview),
 formerly "Azure AD pod identity".
 
@@ -783,6 +799,45 @@ requests. User can specify them as fixed SAS Token to be 
used across all the req
     - fs.azure.sas.fixed.token.ACCOUNT_NAME
     - fs.azure.sas.fixed.token
 
+### User-bound SAS
+- **Description**: The user-bound SAS auth type allows to track the usage of 
the SAS token generated- something
+ that was not possible in user-delegation SAS authentication type. Reach out 
to us at '[email protected]' for more information.
+ To use this authentication type, both custom SAS token provider class (that 
implements org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider) as
+    well as OAuth 2.0 provider type need to be specified.
+    - Refer to 'Shared Access Signature (SAS) Token Provider' section above 
for user-delegation SAS token provider class details and example class 
implementation.
+    - There are multiple identity configurations for OAuth settings. Listing 
the main ones below:
+        - Client Credentials
+        - Custom token provider
+        - Managed Identity
+        - Workload Identity
+
+      Refer to respective OAuth 2.0 sections above to correctly chose the 
OAuth provider type
+    - NOTE: User-bound SAS Authentication is **only supported** with HNS 
Enabled accounts.
+
+- **Configuration**: To use this method with ABFS Driver, specify the 
following properties in your `core-site.xml` file:
+
+    1. Authentication Type:
+        ```xml
+        <property>
+          <name>fs.azure.account.auth.type</name>
+          <value>UserboundSASWithOAuth</value>
+        </property>
+        ```
+    2. OAuth 2.0 Provider Type:
+        ```xml
+        <property>
+          <name>fs.azure.account.oauth.provider.type</name>
+          
<value>org.apache.hadoop.fs.azurebfs.oauth2.ADD_CHOSEN_OAUTH_IDENTITY_CONFIGURATION</value>
+        </property>
+       ```
+    3. Custom SAS Token Provider Class:
+        ```xml
+        <property>
+          <name>fs.azure.sas.token.provider.type</name>
+          <value>CUSTOM_SAS_TOKEN_PROVIDER_CLASS</value>
+        </property>
+        ```
+
 ## <a name="technical"></a> Technical notes
 
 ### <a name="proxy"></a> Proxy setup
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java
index 798b1943e07..c7762172f3e 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java
@@ -200,6 +200,8 @@ public void setup() throws Exception {
     // Only live account without namespace support can run ABFS&WASB
     // compatibility tests
     if (!isIPAddress && (abfsConfig.getAuthType(accountName) != AuthType.SAS)
+        && (abfsConfig.getAuthType(accountName)
+        != AuthType.UserboundSASWithOAuth)
         && !abfs.getIsNamespaceEnabled(getTestTracingContext(
             getFileSystem(), false))) {
       final URI wasbUri = new URI(
@@ -326,6 +328,20 @@ protected void 
createFilesystemWithTestFileForSASTests(Path testPath) throws Exc
     }
   }
 
+  /**
+   * Create a filesystem for user bound SAS tests using the SharedKey 
authentication.
+   *
+   * @throws Exception
+   */
+  protected void createFilesystemForUserBoundSASTests() throws Exception{
+    try (AzureBlobFileSystem tempFs = (AzureBlobFileSystem) 
FileSystem.newInstance(rawConfig)){
+      ContractTestUtils.assertPathExists(tempFs, "This path should exist",
+          new Path("/"));
+      abfsConfig.set(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME, 
AuthType.UserboundSASWithOAuth.name());
+      usingFilesystemForSASTests = true;
+    }
+  }
+
   public AzureBlobFileSystem getFileSystem() throws IOException {
     return abfs;
   }
@@ -588,6 +604,9 @@ protected void assumeValidAuthConfigsPresent() {
     assumeThat(currentAuthType).
         as("SAS Based Authentication Not Allowed For Integration Tests").
         isNotEqualTo(AuthType.SAS);
+    assumeThat(currentAuthType).
+        as("User-bound SAS Based Authentication Not Allowed For Integration 
Tests").
+        isNotEqualTo(AuthType.UserboundSASWithOAuth);
     if (currentAuthType == AuthType.SharedKey) {
       assumeValidTestConfigPresent(getRawConfiguration(), 
FS_AZURE_ACCOUNT_KEY);
     } else {
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemUserBoundSAS.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemUserBoundSAS.java
new file mode 100644
index 00000000000..044de137499
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemUserBoundSAS.java
@@ -0,0 +1,414 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.nio.file.AccessDeniedException;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys;
+import org.apache.hadoop.fs.azurebfs.extensions.MockInvalidSASTokenProvider;
+import org.apache.hadoop.fs.azurebfs.extensions.MockUserBoundSASTokenProvider;
+import org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider;
+import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
+import org.apache.hadoop.fs.azurebfs.oauth2.AzureADToken;
+import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
+import org.apache.hadoop.fs.azurebfs.services.AuthType;
+import org.apache.hadoop.fs.permission.AclEntry;
+
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_OAUTH_CLIENT_ID;
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_OAUTH_CLIENT_SECRET;
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_SAS_TOKEN_PROVIDER_TYPE;
+import static 
org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_BLOB_FS_CHECKACCESS_TEST_CLIENT_ID;
+import static 
org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_BLOB_FS_CHECKACCESS_TEST_CLIENT_SECRET;
+import static 
org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_BLOB_FS_CHECKACCESS_TEST_USER_GUID;
+import static 
org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_BLOB_FS_CLIENT_SERVICE_PRINCIPAL_OBJECT_ID;
+import static 
org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_TEST_APP_SERVICE_PRINCIPAL_TENANT_ID;
+import static 
org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_TEST_END_USER_OBJECT_ID;
+import static 
org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_TEST_END_USER_TENANT_ID;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+import static org.assertj.core.api.Assumptions.assumeThat;
+
+/**
+ * Integration tests for AzureBlobFileSystem using User-Bound SAS and OAuth.
+ * Covers scenarios for token provider configuration, SAS token validity, and 
basic file operations.
+ */
+public class ITestAzureBlobFileSystemUserBoundSAS
+    extends AbstractAbfsIntegrationTest {
+
+  private static Path testPath = new Path("/test.txt");
+
+  private static final String TEST_OBJECT_ID = "123456789";
+
+  private static final String INVALID_OAUTH_TOKEN_VALUE = 
"InvalidOAuthTokenValue";
+
+  /**
+   * Constructor. Ensures tests run with SharedKey authentication.
+   * @throws Exception if auth type is not SharedKey
+   */
+  protected ITestAzureBlobFileSystemUserBoundSAS() throws Exception {
+    assumeThat(this.getAuthType()).isEqualTo(AuthType.SharedKey);
+    
assumeThat(this.getConfiguration().getIsNamespaceEnabledAccount().toBoolean()).
+        isEqualTo(true);
+  }
+
+  /**
+   * Sets up the test environment and configures the AbfsConfiguration for 
user-bound SAS tests.
+   * @throws Exception if setup fails
+   */
+  @BeforeEach
+  @Override
+  public void setup() throws Exception {
+    AbfsConfiguration abfsConfig = this.getConfiguration();
+    String accountName = getAccountName();
+
+    createFilesystemForUserBoundSASTests();
+    super.setup();
+
+    // Set all required configs on the raw configuration
+    abfsConfig.set(
+        FS_AZURE_BLOB_FS_CLIENT_SERVICE_PRINCIPAL_OBJECT_ID + "." + 
accountName,
+        abfsConfig.get(FS_AZURE_BLOB_FS_CHECKACCESS_TEST_USER_GUID));
+    abfsConfig.set(FS_AZURE_BLOB_FS_CLIENT_SERVICE_PRINCIPAL_OBJECT_ID,
+        abfsConfig.get(FS_AZURE_BLOB_FS_CHECKACCESS_TEST_USER_GUID));
+    abfsConfig.set(FS_AZURE_ACCOUNT_OAUTH_CLIENT_ID + "." + accountName,
+        abfsConfig.get(FS_AZURE_BLOB_FS_CHECKACCESS_TEST_CLIENT_ID));
+    abfsConfig.set(FS_AZURE_ACCOUNT_OAUTH_CLIENT_ID,
+        abfsConfig.get(FS_AZURE_BLOB_FS_CHECKACCESS_TEST_CLIENT_ID));
+    abfsConfig.set(FS_AZURE_ACCOUNT_OAUTH_CLIENT_SECRET + "." + accountName,
+        abfsConfig.get(FS_AZURE_BLOB_FS_CHECKACCESS_TEST_CLIENT_SECRET));
+    abfsConfig.set(FS_AZURE_ACCOUNT_OAUTH_CLIENT_SECRET,
+        abfsConfig.get(FS_AZURE_BLOB_FS_CHECKACCESS_TEST_CLIENT_SECRET));
+    abfsConfig.set(FS_AZURE_TEST_END_USER_TENANT_ID,
+        abfsConfig.get(FS_AZURE_TEST_APP_SERVICE_PRINCIPAL_TENANT_ID));
+    abfsConfig.set(FS_AZURE_TEST_END_USER_OBJECT_ID,
+        abfsConfig.get(FS_AZURE_BLOB_FS_CHECKACCESS_TEST_USER_GUID));
+    abfsConfig.set(FS_AZURE_SAS_TOKEN_PROVIDER_TYPE,
+        MockUserBoundSASTokenProvider.class.getName());
+  }
+
+
+  /**
+   * Injects a mock AccessTokenProvider into the AbfsClient of the given 
filesystem.
+   * @param fs AzureBlobFileSystem instance
+   * @param mockProvider AccessTokenProvider to inject
+   * @throws Exception if reflection fails
+   */
+  private void injectMockTokenProvider(AzureBlobFileSystem fs,
+      AccessTokenProvider mockProvider) throws Exception {
+    Field abfsStoreField = AzureBlobFileSystem.class.getDeclaredField(
+        "abfsStore");
+    abfsStoreField.setAccessible(true);
+    AzureBlobFileSystemStore store
+        = (AzureBlobFileSystemStore) abfsStoreField.get(fs);
+
+    Field abfsClientField = AzureBlobFileSystemStore.class.getDeclaredField(
+        "client");
+    abfsClientField.setAccessible(true);
+    AbfsClient client = (AbfsClient) abfsClientField.get(store);
+
+    Field tokenProviderField = AbfsClient.class.getDeclaredField(
+        "tokenProvider");
+    tokenProviderField.setAccessible(true);
+    tokenProviderField.set(client, mockProvider);
+  }
+
+  /**
+   * Helper to create a new AzureBlobFileSystem instance for tests.
+   * @return AzureBlobFileSystem instance
+   * @throws RuntimeException if creation fails
+   */
+  private AzureBlobFileSystem createTestFileSystem() throws RuntimeException {
+    try {
+      return (AzureBlobFileSystem) FileSystem.newInstance(
+          getRawConfiguration());
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Test that file creation fails when the end user object ID does not match 
the service principal object ID.
+   * @throws Exception if test fails
+   */
+  @Test
+  public void testShouldFailWhenSduoidMismatchesServicePrincipalId()
+      throws Exception {
+    this.getConfiguration()
+        .set(FS_AZURE_TEST_END_USER_OBJECT_ID, TEST_OBJECT_ID);
+    AzureBlobFileSystem testFs = createTestFileSystem();
+    intercept(AccessDeniedException.class,
+        () -> {
+          testFs.create(testPath);
+        });
+  }
+
+  /**
+   * Verifies that both OAuth token provider and user-bound SAS token provider 
are configured and usable.
+   * @throws Exception if test fails
+   */
+  @Test
+  public void testOAuthTokenProviderAndSASTokenFlow() throws Exception {
+    AzureBlobFileSystem testFs = createTestFileSystem();
+
+    AbfsConfiguration abfsConfiguration = testFs.getAbfsStore()
+        .getAbfsConfiguration();
+
+    // Verify AbfsConfiguration has an OAuth token provider configured
+    AccessTokenProvider tokenProvider = abfsConfiguration.getTokenProvider();
+    assertNotNull(tokenProvider,
+        "AccessTokenProvider must be configured for UserboundSASWithOAuth");
+
+    // Acquire an OAuth token and assert it is non-empty
+    AzureADToken token = tokenProvider.getToken();
+    assertNotNull(token, "OAuth token must not be null");
+    assertNotNull(token.getAccessToken(),
+        "OAuth access token must not be null");
+    assertFalse(token.getAccessToken().isEmpty(),
+        "OAuth access token must not be empty");
+
+    // Verify AbfsConfiguration has an SASTokenProvider configured
+    SASTokenProvider sasProvider
+        = abfsConfiguration.getUserBoundSASTokenProvider(
+        AuthType.UserboundSASWithOAuth);
+    assertNotNull(sasProvider,
+        "SASTokenProvider for user-bound SAS must be configured");
+    assertInstanceOf(MockUserBoundSASTokenProvider.class, sasProvider,
+        "Expected MockUserBoundSASTokenProvider to be used for tests");
+
+    // Request a SAS token and assert we get a non-empty result
+    String sasToken = sasProvider.getSASToken(
+        "abfsdrivercanaryhns.dfs.core.windows.net", "userbound", "/",
+        SASTokenProvider.GET_PROPERTIES_OPERATION);
+    assertNotNull(sasToken, "SAS token must not be null");
+    assertFalse(sasToken.isEmpty(), "SAS token must not be empty");
+  }
+
+/**
+ * Performs and validates basic file and directory operations, including 
rename.
+ * Operations tested: create, open, write, read, list, mkdir, existence check, 
ACL (if HNS), rename, and delete.
+ * @throws Exception if any operation fails
+ */
+  @Test
+  public void testBasicOperations() throws Exception {
+    AzureBlobFileSystem testFs = createTestFileSystem();
+
+    // 1. Create file
+    testFs.create(testPath).close();
+
+    // 2. Open file
+    testFs.open(testPath).close();
+
+    // 3. Get file status
+    testFs.getFileStatus(testPath);
+
+    // 4. Write to file (overwrite)
+    try (FSDataOutputStream out = testFs.create(testPath, true)) {
+      out.writeUTF("hello");
+    }
+
+    // 5. Read from file
+    try (FSDataInputStream in = testFs.open(testPath)) {
+      String content = in.readUTF();
+      assertEquals("hello", content);
+    }
+
+    // 6. List parent directory
+    FileStatus[] files = testFs.listStatus(testPath.getParent());
+    assertTrue(files.length > 0);
+
+    // 7. Check file existence
+    assertTrue(testFs.exists(testPath));
+
+    // 8. Create directory and a file under it
+    Path dirPath = new Path("/testDirAcl");
+    Path filePath = new Path(dirPath, "fileInDir.txt");
+
+    assertTrue(testFs.mkdirs(dirPath));
+
+    // 9. ACL operations (only for HNS accounts)
+    if (getConfiguration().getBoolean(
+        TestConfigurationKeys.FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT, false)) 
{
+      // Set ACL
+      List<AclEntry> aclSpec = Arrays.asList(
+          AclEntry.parseAclEntry("user::rwx", true),
+          AclEntry.parseAclEntry("group::r-x", true),
+          AclEntry.parseAclEntry("other::---", true)
+      );
+      testFs.setAcl(dirPath, aclSpec);
+
+      // Get ACL
+      List<AclEntry> returnedAcl = testFs.getAclStatus(dirPath).getEntries();
+      assertNotNull(returnedAcl);
+    }
+
+    // 10. Rename file
+    Path renamedPath = new Path("/testRenamed.txt");
+    assertTrue(testFs.rename(testPath, renamedPath));
+    assertFalse(testFs.exists(testPath));
+    assertTrue(testFs.exists(renamedPath));
+
+    // 11. Delete file (non-recursive)
+    assertTrue(testFs.delete(renamedPath, false));
+    assertFalse(testFs.exists(renamedPath));
+
+    // 12. Delete directory (recursive)
+    assertTrue(testFs.delete(dirPath, true));
+    assertFalse(testFs.exists(dirPath));
+    assertFalse(testFs.exists(filePath));
+  }
+
+  /**
+   * Test that file creation fails when an invalid OAuth token is used.
+   * @throws Exception if test fails
+   */
+  @Test
+  public void testCreateFailsWithInvalidOAuthToken() throws Exception {
+    AzureBlobFileSystem testFs = createTestFileSystem();
+
+    // Create mock token provider with invalid token
+    AccessTokenProvider mockProvider = Mockito.mock(AccessTokenProvider.class);
+    AzureADToken mockToken = Mockito.mock(AzureADToken.class);
+    Mockito.when(mockToken.getAccessToken()).thenReturn(
+        INVALID_OAUTH_TOKEN_VALUE);
+    Mockito.when(mockProvider.getToken()).thenReturn(mockToken);
+
+    // Inject mock provider into AbfsClient
+    injectMockTokenProvider(testFs, mockProvider);
+
+    intercept(AccessDeniedException.class, () -> {
+      testFs.create(testPath);
+    });
+  }
+
+  /**
+   * Test that file creation fails when an invalid SAS token is used.
+   * @throws Exception if test fails
+   */
+  @Test
+  public void testGPSFailsWithInvalidSASToken() throws Exception {
+    AbfsConfiguration abfsConfig = this.getConfiguration();
+    abfsConfig.set(FS_AZURE_SAS_TOKEN_PROVIDER_TYPE,
+        MockInvalidSASTokenProvider.class.getName());
+    AzureBlobFileSystem invalidSASTokenFs = createTestFileSystem();
+    intercept(AccessDeniedException.class,
+        () -> {
+          invalidSASTokenFs.create(testPath);
+        }
+    );
+  }
+
+
+  /**
+   * Test file operations with a valid and then expired SAS token.
+   * Verifies that operations succeed with a valid token and fail with an 
expired token.
+   * @throws Exception if test fails
+   */
+  @Test
+  public void testOperationWithValidAndExpiredSASToken() throws Exception {
+    AzureBlobFileSystem testFs = createTestFileSystem();
+
+    // Get a real SAS token from the configured provider
+    AbfsConfiguration abfsConfig = 
testFs.getAbfsStore().getAbfsConfiguration();
+    SASTokenProvider realSasProvider
+        = abfsConfig.getUserBoundSASTokenProvider(
+        AuthType.UserboundSASWithOAuth);
+    assertNotNull(realSasProvider,
+        "SASTokenProvider for user-bound SAS must be configured");
+    String validSasToken = realSasProvider.getSASToken(
+        getAccountName(),
+        testFs.toString(),
+        String.valueOf(testPath),
+        SASTokenProvider.CREATE_FILE_OPERATION);
+    assertNotNull(validSasToken, "SAS token must not be null");
+    assertFalse(validSasToken.isEmpty(), "SAS token must not be empty");
+
+    // Operation should work with valid SAS token
+    testFs.create(testPath); // Should succeed
+
+    // Modify the ske/se fields to be expired and inject a mock provider
+    String expiredDate = OffsetDateTime.now(ZoneOffset.UTC)
+        .minusDays(1)
+        .format(DateTimeFormatter.ISO_DATE_TIME);
+    String expiredSasToken = Arrays.stream(validSasToken.split("&"))
+        .map(kv -> {
+          String[] pair = kv.split("=", 2);
+          if (pair[0].equals("ske") || pair[0].equals("se")) {
+            return pair[0] + "=" + expiredDate;
+          } else {
+            return kv;
+          }
+        })
+        .collect(Collectors.joining("&"));
+
+    // Create a mock SASTokenProvider that returns the expired SAS token
+    SASTokenProvider mockSasProvider = Mockito.mock(
+        SASTokenProvider.class);
+    Mockito.when(
+            mockSasProvider.getSASToken(Mockito.anyString(),
+                Mockito.anyString(), Mockito.anyString(),
+                Mockito.anyString()))
+        .thenReturn(expiredSasToken);
+
+    // Inject the mock provider into the AbfsClient
+    injectMockSASTokenProvider(testFs, mockSasProvider);
+
+    // Try a file operation and expect failure due to expired SAS token
+    intercept(AccessDeniedException.class,
+        () -> {
+          testFs.getFileStatus(testPath);
+        }
+    );
+  }
+
+  // Helper method to inject a mock SASTokenProvider into the AbfsClient
+  private void injectMockSASTokenProvider(AzureBlobFileSystem fs,
+      SASTokenProvider provider) throws Exception {
+    Field abfsStoreField = AzureBlobFileSystem.class.getDeclaredField(
+        "abfsStore");
+    abfsStoreField.setAccessible(true);
+    AzureBlobFileSystemStore store
+        = (AzureBlobFileSystemStore) abfsStoreField.get(fs);
+
+    Field abfsClientField = AzureBlobFileSystemStore.class.getDeclaredField(
+        "client");
+    abfsClientField.setAccessible(true);
+    AbfsClient client = (AbfsClient) abfsClientField.get(store);
+
+    Field sasProviderField = AbfsClient.class.getDeclaredField(
+        "sasTokenProvider");
+    sasProviderField.setAccessible(true);
+    sasProviderField.set(client, provider);
+  }
+}
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/constants/TestConfigurationKeys.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/constants/TestConfigurationKeys.java
index ebccae55c0a..d82a4d2879b 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/constants/TestConfigurationKeys.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/constants/TestConfigurationKeys.java
@@ -55,6 +55,9 @@ public final class TestConfigurationKeys {
 
   public static final String FS_AZURE_TEST_APP_SERVICE_PRINCIPAL_OBJECT_ID = 
"fs.azure.test.app.service.principal.object.id";
 
+  public static final String FS_AZURE_TEST_END_USER_TENANT_ID = 
"fs.azure.test.end.user.tenant.id";
+  public static final String FS_AZURE_TEST_END_USER_OBJECT_ID = 
"fs.azure.test.end.user.object.id";
+
   public static final String FS_AZURE_TEST_APP_ID = "fs.azure.test.app.id";
 
   public static final String FS_AZURE_TEST_APP_SECRET = 
"fs.azure.test.app.secret";
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/MockDelegationSASTokenProvider.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/MockDelegationSASTokenProvider.java
index 36c38e80b79..d221f3ea481 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/MockDelegationSASTokenProvider.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/MockDelegationSASTokenProvider.java
@@ -36,10 +36,11 @@
 import org.apache.hadoop.fs.azurebfs.services.AbfsHttpHeader;
 import org.apache.hadoop.fs.azurebfs.services.AbfsJdkHttpOperation;
 import org.apache.hadoop.fs.azurebfs.utils.Base64;
-import org.apache.hadoop.fs.azurebfs.utils.DelegationSASGenerator;
+import org.apache.hadoop.fs.azurebfs.utils.DelegationSASGeneratorVersionJuly5;
 import org.apache.hadoop.fs.azurebfs.utils.SASGenerator;
 import org.apache.hadoop.security.AccessControlException;
 
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
 import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_HTTP_CONNECTION_TIMEOUT;
 import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_HTTP_READ_TIMEOUT;
 
@@ -48,7 +49,7 @@
  */
 public class MockDelegationSASTokenProvider implements SASTokenProvider {
 
-  private DelegationSASGenerator generator;
+  private DelegationSASGeneratorVersionJuly5 generator;
 
   public static final String TEST_OWNER = 
"325f1619-4205-432f-9fce-3fd594325ce5";
   public static final String CORRELATION_ID = 
"66ff4ffc-ff17-417e-a2a9-45db8c5b0b5c";
@@ -65,8 +66,7 @@ public void initialize(Configuration configuration, String 
accountName) throws I
     String skv = SASGenerator.AuthenticationVersion.Dec19.toString();
 
     byte[] key = getUserDelegationKey(accountName, appID, appSecret, sktid, 
skt, ske, skv);
-
-    generator = new DelegationSASGenerator(key, skoid, sktid, skt, ske, skv);
+    generator = new DelegationSASGeneratorVersionJuly5(key, skoid, sktid, skt, 
ske, skv, EMPTY_STRING, EMPTY_STRING);
   }
 
   // Invokes the AAD v2.0 authentication endpoint with a client credentials 
grant to get an
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/MockInvalidSASTokenProvider.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/MockInvalidSASTokenProvider.java
new file mode 100644
index 00000000000..7be3b0c9786
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/MockInvalidSASTokenProvider.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.extensions;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * A mock SAS token provider to test error conditions.
+ */
+public class MockInvalidSASTokenProvider implements SASTokenProvider {
+  private final String invalidSASToken = "testInvalidSASToken";
+
+  @Override
+  public void initialize(Configuration configuration, String accountName) {
+    //do nothing
+  }
+
+  /**
+   * Returns null SAS token query or Empty if returnEmptySASToken is set.
+   * @param accountName
+   * @param fileSystem the name of the fileSystem.
+   * @param path the file or directory path.
+   * @param operation the operation to be performed on the path.
+   * @return
+   */
+  @Override
+  public String getSASToken(String accountName, String fileSystem, String path,
+      String operation) {
+    return invalidSASToken;
+  }
+
+}
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/MockDelegationSASTokenProvider.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/MockUserBoundSASTokenProvider.java
similarity index 57%
copy from 
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/MockDelegationSASTokenProvider.java
copy to 
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/MockUserBoundSASTokenProvider.java
index 36c38e80b79..e4ede0345e8 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/MockDelegationSASTokenProvider.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/MockUserBoundSASTokenProvider.java
@@ -36,24 +36,44 @@
 import org.apache.hadoop.fs.azurebfs.services.AbfsHttpHeader;
 import org.apache.hadoop.fs.azurebfs.services.AbfsJdkHttpOperation;
 import org.apache.hadoop.fs.azurebfs.utils.Base64;
-import org.apache.hadoop.fs.azurebfs.utils.DelegationSASGenerator;
+import org.apache.hadoop.fs.azurebfs.utils.DelegationSASGeneratorVersionJuly5;
 import org.apache.hadoop.fs.azurebfs.utils.SASGenerator;
 import org.apache.hadoop.security.AccessControlException;
 
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPLICATION_X_WWW_FORM_URLENCODED;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_POST;
 import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_HTTP_CONNECTION_TIMEOUT;
 import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_HTTP_READ_TIMEOUT;
 
 /**
- * A mock SAS token provider implementation.
+ * A mock user-bound SAS token provider implementation for testing purposes.
+ * Provides functionality to generate user delegation SAS tokens for Azure 
Blob Storage.
  */
-public class MockDelegationSASTokenProvider implements SASTokenProvider {
+public class MockUserBoundSASTokenProvider implements SASTokenProvider {
 
-  private DelegationSASGenerator generator;
+  // Constants for URLs and endpoints
+  private static final String AZURE_BLOB_ENDPOINT_TEMPLATE = 
"https://%s.blob.core.windows.net/";;
+  private static final String AZURE_LOGIN_ENDPOINT_TEMPLATE = 
"https://login.microsoftonline.com/%s/oauth2/v2.0/token";;
+  private static final String USER_DELEGATION_QUERY_PARAMS = 
"?restype=service&comp=userdelegationkey";
+
+
+  // HTTP related constants
+  private static final String UTF_8 = StandardCharsets.UTF_8.toString();
+  private static final int RESPONSE_BUFFER_SIZE = 4 * 1024;
 
   public static final String TEST_OWNER = 
"325f1619-4205-432f-9fce-3fd594325ce5";
   public static final String CORRELATION_ID = 
"66ff4ffc-ff17-417e-a2a9-45db8c5b0b5c";
   public static final String NO_AGENT_PATH = "NoAgentPath";
 
+  private DelegationSASGeneratorVersionJuly5 generator;
+
+  /**
+   * Initializes the SAS token provider with configuration settings.
+   *
+   * @param configuration Configuration containing Azure storage settings
+   * @param accountName The name of the storage account to initialize for
+   * @throws IOException if there is an error during initialization
+   */
   @Override
   public void initialize(Configuration configuration, String accountName) 
throws IOException {
     String appID = 
configuration.get(TestConfigurationKeys.FS_AZURE_TEST_APP_ID);
@@ -62,61 +82,89 @@ public void initialize(Configuration configuration, String 
accountName) throws I
     String skoid = 
configuration.get(TestConfigurationKeys.FS_AZURE_TEST_APP_SERVICE_PRINCIPAL_OBJECT_ID);
     String skt = 
SASGenerator.ISO_8601_FORMATTER.format(Instant.now().minus(SASGenerator.FIVE_MINUTES));
     String ske = 
SASGenerator.ISO_8601_FORMATTER.format(Instant.now().plus(SASGenerator.ONE_DAY));
-    String skv = SASGenerator.AuthenticationVersion.Dec19.toString();
+    String skv = SASGenerator.AuthenticationVersion.Jul5.toString();
 
-    byte[] key = getUserDelegationKey(accountName, appID, appSecret, sktid, 
skt, ske, skv);
+    String skdutid = 
configuration.get(TestConfigurationKeys.FS_AZURE_TEST_END_USER_TENANT_ID);
+    String sduoid = 
configuration.get(TestConfigurationKeys.FS_AZURE_TEST_END_USER_OBJECT_ID);
 
-    generator = new DelegationSASGenerator(key, skoid, sktid, skt, ske, skv);
+    byte[] key = getUserDelegationKey(accountName, appID, appSecret, sktid, 
skt, ske, skv, skdutid);
+
+    generator = new DelegationSASGeneratorVersionJuly5(key, skoid, sktid, skt, 
ske, skv, skdutid, sduoid);
   }
 
-  // Invokes the AAD v2.0 authentication endpoint with a client credentials 
grant to get an
-  // access token.  See 
https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-client-creds-grant-flow.
+  /**
+   * Gets the authorization header for Azure AD authentication.
+   * Invokes the AAD v2.0 authentication endpoint with a client credentials
+   * grant to get an access token.
+   * See 
https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-client-creds-grant-flow.
+   *
+   * @param accountName The storage account name
+   * @param appID The Azure AD application ID
+   * @param appSecret The Azure AD application secret
+   * @param sktid The service principal tenant ID
+   * @return The authorization header string with bearer token
+   * @throws IOException if there is an error getting the authorization token
+   */
   private String getAuthorizationHeader(String accountName, String appID, 
String appSecret, String sktid) throws IOException {
-    String authEndPoint = 
String.format("https://login.microsoftonline.com/%s/oauth2/v2.0/token";, sktid);
+    String authEndPoint = String.format(AZURE_LOGIN_ENDPOINT_TEMPLATE, sktid);
     ClientCredsTokenProvider provider = new 
ClientCredsTokenProvider(authEndPoint, appID, appSecret);
     return "Bearer " + provider.getToken().getAccessToken();
   }
 
+  /**
+   * Retrieves a user delegation key from Azure Storage.
+   *
+   * @param accountName The storage account name
+   * @param appID The Azure AD application ID
+   * @param appSecret The Azure AD application secret
+   * @param sktid The service principal tenant ID
+   * @param skt The start time for the delegation key
+   * @param ske The expiry time for the delegation key
+   * @param skv The API version for the request
+   * @param skdutid The delegated user tenant ID
+   * @return The user delegation key as a byte array
+   * @throws IOException if there is an error retrieving the delegation key
+   */
   private byte[] getUserDelegationKey(String accountName, String appID, String 
appSecret,
-      String sktid, String skt, String ske, String skv) throws IOException {
+      String sktid, String skt, String ske, String skv, String skdutid) throws 
IOException {
 
-    String method = "POST";
     String account = accountName.substring(0, 
accountName.indexOf(AbfsHttpConstants.DOT));
-
-    final StringBuilder sb = new StringBuilder(128);
-    sb.append("https://";);
-    sb.append(account);
-    
sb.append(".blob.core.windows.net/?restype=service&comp=userdelegationkey");
+    String baseUrl = String.format(AZURE_BLOB_ENDPOINT_TEMPLATE, account);
+    String urlString = baseUrl + USER_DELEGATION_QUERY_PARAMS;
 
     URL url;
     try {
-      url = new URL(sb.toString());
+      url = new URL(urlString);
     } catch (MalformedURLException ex) {
-      throw new InvalidUriException(sb.toString());
+      throw new InvalidUriException(urlString);
     }
 
-    List<AbfsHttpHeader> requestHeaders = new ArrayList<AbfsHttpHeader>();
+    List<AbfsHttpHeader> requestHeaders = new ArrayList<>();
     requestHeaders.add(new 
AbfsHttpHeader(HttpHeaderConfigurations.X_MS_VERSION, skv));
-    requestHeaders.add(new 
AbfsHttpHeader(HttpHeaderConfigurations.CONTENT_TYPE, 
"application/x-www-form-urlencoded"));
-    requestHeaders.add(new 
AbfsHttpHeader(HttpHeaderConfigurations.AUTHORIZATION, 
getAuthorizationHeader(account, appID, appSecret, sktid)));
+    requestHeaders.add(new 
AbfsHttpHeader(HttpHeaderConfigurations.CONTENT_TYPE, 
APPLICATION_X_WWW_FORM_URLENCODED));
+    requestHeaders.add(new 
AbfsHttpHeader(HttpHeaderConfigurations.AUTHORIZATION,
+        getAuthorizationHeader(account, appID, appSecret, sktid)));
 
     final StringBuilder requestBody = new StringBuilder(512);
     requestBody.append("<?xml version=\"1.0\" 
encoding=\"utf-8\"?><KeyInfo><Start>");
     requestBody.append(skt);
     requestBody.append("</Start><Expiry>");
     requestBody.append(ske);
-    requestBody.append("</Expiry></KeyInfo>");
+    requestBody.append("</Expiry><DelegatedUserTid>");
+    requestBody.append(skdutid);
+    requestBody.append("</DelegatedUserTid></KeyInfo>");
 
-    AbfsJdkHttpOperation op = new AbfsJdkHttpOperation(url, method, 
requestHeaders,
-        Duration.ofMillis(DEFAULT_HTTP_CONNECTION_TIMEOUT), 
Duration.ofMillis(DEFAULT_HTTP_READ_TIMEOUT), null);
+    AbfsJdkHttpOperation op = new AbfsJdkHttpOperation(url, HTTP_METHOD_POST, 
requestHeaders,
+        Duration.ofMillis(DEFAULT_HTTP_CONNECTION_TIMEOUT),
+        Duration.ofMillis(DEFAULT_HTTP_READ_TIMEOUT), null);
 
-    byte[] requestBuffer = 
requestBody.toString().getBytes(StandardCharsets.UTF_8.toString());
+    byte[] requestBuffer = requestBody.toString().getBytes(UTF_8);
     op.sendPayload(requestBuffer, 0, requestBuffer.length);
 
-    byte[] responseBuffer = new byte[4 * 1024];
+    byte[] responseBuffer = new byte[RESPONSE_BUFFER_SIZE];
     op.processResponse(responseBuffer, 0, responseBuffer.length);
 
-    String responseBody = new String(responseBuffer, 0, (int) 
op.getBytesReceived(), StandardCharsets.UTF_8);
+    String responseBody = new String(responseBuffer, 0, (int) 
op.getBytesReceived(), UTF_8);
     int beginIndex = responseBody.indexOf("<Value>") + "<Value>".length();
     int endIndex = responseBody.indexOf("</Value>");
     String value = responseBody.substring(beginIndex, endIndex);
@@ -124,10 +172,8 @@ private byte[] getUserDelegationKey(String accountName, 
String appID, String app
   }
 
   /**
-   * Invokes the authorizer to obtain a SAS token.
+   * {@inheritDoc}
    *
-   * @param accountName the name of the storage account.
-   * @param fileSystem the name of the fileSystem.
    * @param path the file or directory path.
    * @param operation the operation to be performed on the path.
    * @return a SAS token to perform the request operation.
@@ -136,7 +182,7 @@ private byte[] getUserDelegationKey(String accountName, 
String appID, String app
    */
   @Override
   public String getSASToken(String accountName, String fileSystem, String path,
-                     String operation) throws IOException, 
AccessControlException {
+      String operation) throws IOException, AccessControlException {
     // Except for the special case where we test without an agent,
     // the user for these tests is always TEST_OWNER.  The check access 
operation
     // requires suoid to check permissions for the user and will throw if the
@@ -147,7 +193,8 @@ public String getSASToken(String accountName, String 
fileSystem, String path,
       saoid = (operation == SASTokenProvider.CHECK_ACCESS_OPERATION) ? null : 
TEST_OWNER;
       suoid = (operation == SASTokenProvider.CHECK_ACCESS_OPERATION) ? 
TEST_OWNER : null;
     }
+
     return generator.getDelegationSAS(accountName, fileSystem, path, operation,
         saoid, suoid, CORRELATION_ID);
   }
-}
\ No newline at end of file
+}
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java
index 8505f5f3266..a5fbe30babc 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java
@@ -32,6 +32,8 @@
 import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedClass;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
 import org.junit.jupiter.params.provider.MethodSource;
 import org.mockito.Mockito;
 
@@ -45,13 +47,15 @@
 import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore;
 import org.apache.hadoop.fs.azurebfs.TestAbfsConfigurationFieldsValidation;
 import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType;
+import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
 import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
 import org.apache.hadoop.fs.azurebfs.constants.HttpOperationType;
 import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsApacheHttpExpect100Exception;
 import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.TokenAccessProviderException;
 import 
org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
+import org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider;
 import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
-import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
 import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
 import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderFormat;
 import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
@@ -61,12 +65,25 @@
 
 import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
 import static 
org.apache.hadoop.fs.azurebfs.ITestAzureBlobFileSystemListStatus.TEST_CONTINUATION_TOKEN;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APN_VERSION;
 import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPEND_ACTION;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CLIENT_VERSION;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DOT;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
 import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EXPECT_100_JDK_ERROR;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FORWARD_SLASH;
 import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PATCH;
 import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
 import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HUNDRED_CONTINUE;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.JAVA_VENDOR;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.JAVA_VERSION;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.OS_ARCH;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.OS_NAME;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.OS_VERSION;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SEMICOLON;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SINGLE_WHITE_SPACE;
 import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION;
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME;
 import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_IS_HNS_ENABLED;
 import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_APACHE_HTTP_CLIENT_CACHE_WARMUP_COUNT;
 import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_CLUSTER_NAME;
@@ -75,33 +92,21 @@
 import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_VALUE_UNKNOWN;
 import static 
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.EXPECT;
 import static 
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_HTTP_METHOD_OVERRIDE;
+import static 
org.apache.hadoop.fs.azurebfs.constants.HttpOperationType.APACHE_HTTP_CLIENT;
+import static 
org.apache.hadoop.fs.azurebfs.constants.HttpOperationType.JDK_HTTP_URL_CONNECTION;
 import static 
org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_ACTION;
 import static 
org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_POSITION;
 import static 
org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_ABFS_ACCOUNT_NAME;
-import static 
org.apache.hadoop.fs.azurebfs.constants.HttpOperationType.APACHE_HTTP_CLIENT;
-import static 
org.apache.hadoop.fs.azurebfs.constants.HttpOperationType.JDK_HTTP_URL_CONNECTION;
+import static 
org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.TEST_CONFIGURATION_FILE_NAME;
+import static org.apache.hadoop.fs.azurebfs.services.AuthType.SharedKey;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+import static org.assertj.core.api.Assumptions.assumeThat;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.when;
 
-import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APN_VERSION;
-import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CLIENT_VERSION;
-import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DOT;
-import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
-import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FORWARD_SLASH;
-import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.JAVA_VENDOR;
-import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.JAVA_VERSION;
-import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.OS_ARCH;
-import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.OS_NAME;
-import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.OS_VERSION;
-import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SEMICOLON;
-import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SINGLE_WHITE_SPACE;
-import static 
org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.TEST_CONFIGURATION_FILE_NAME;
-import static org.assertj.core.api.Assumptions.assumeThat;
-
 /**
  * Test useragent of abfs client.
  *
@@ -174,10 +179,10 @@ private String getUserAgentString(AbfsConfiguration 
config,
     AbfsClient client;
     if (AbfsServiceType.DFS.equals(config.getFsConfiguredServiceType())) {
       client = new AbfsDfsClient(new URL("https://azure.com";), null,
-          config, (AccessTokenProvider) null, null, abfsClientContext);
+          config, (AccessTokenProvider) null, null, null, abfsClientContext);
     } else {
       client = new AbfsBlobClient(new URL("https://azure.com";), null,
-          config, (AccessTokenProvider) null, null, abfsClientContext);
+          config, (AccessTokenProvider) null, null, null, abfsClientContext);
     }
     String sslProviderName = null;
     if (includeSSLProvider) {
@@ -420,7 +425,7 @@ public static AbfsClient createTestClientFromCurrentContext(
     if (AbfsServiceType.DFS.equals(abfsConfig.getFsConfiguredServiceType())) {
       testClient = new AbfsDfsClient(
           baseAbfsClientInstance.getBaseUrl(),
-          (currentAuthType == AuthType.SharedKey
+          (currentAuthType == SharedKey
               ? new SharedKeyCredentials(
               abfsConfig.getAccountName().substring(0,
                   abfsConfig.getAccountName().indexOf(DOT)),
@@ -431,11 +436,12 @@ public static AbfsClient 
createTestClientFromCurrentContext(
               ? abfsConfig.getTokenProvider()
               : null),
           null,
+          null,
           abfsClientContext);
     } else {
       testClient = new AbfsBlobClient(
           baseAbfsClientInstance.getBaseUrl(),
-          (currentAuthType == AuthType.SharedKey
+          (currentAuthType == SharedKey
               ? new SharedKeyCredentials(
               abfsConfig.getAccountName().substring(0,
                   abfsConfig.getAccountName().indexOf(DOT)),
@@ -446,6 +452,7 @@ public static AbfsClient createTestClientFromCurrentContext(
               ? abfsConfig.getTokenProvider()
               : null),
           null,
+          null,
           abfsClientContext);
     }
 
@@ -472,7 +479,7 @@ public static AbfsClient createBlobClientFromCurrentContext(
 
     AbfsClient testClient = new AbfsBlobClient(
         baseAbfsClientInstance.getBaseUrl(),
-        (currentAuthType == AuthType.SharedKey
+        (currentAuthType == SharedKey
             ? new SharedKeyCredentials(
             abfsConfig.getAccountName().substring(0,
                 abfsConfig.getAccountName().indexOf(DOT)),
@@ -483,6 +490,7 @@ public static AbfsClient createBlobClientFromCurrentContext(
             ? abfsConfig.getTokenProvider()
             : null),
         null,
+        null,
         abfsClientContext);
 
     return testClient;
@@ -496,7 +504,7 @@ public static AbfsClient getMockAbfsClient(AbfsClient 
baseAbfsClientInstance,
 
     assumeThat(currentAuthType)
         .as("Auth type must be SharedKey or OAuth for this test")
-        .isIn(AuthType.SharedKey, AuthType.OAuth);
+        .isIn(SharedKey, AuthType.OAuth);
 
     AbfsClient client;
     if (AbfsServiceType.DFS.equals(abfsConfig.getFsConfiguredServiceType())) {
@@ -541,7 +549,7 @@ public static AbfsClient getMockAbfsClient(AbfsClient 
baseAbfsClientInstance,
     ReflectionUtils.setFinalField(AbfsClient.class, client, "xMsVersion", 
baseAbfsClientInstance.getxMsVersion());
 
     // override auth provider
-    if (currentAuthType == AuthType.SharedKey) {
+    if (currentAuthType == SharedKey) {
       ReflectionUtils.setFinalField(AbfsClient.class, client, 
"sharedKeyCredentials", new SharedKeyCredentials(
               abfsConfig.getAccountName().substring(0,
                   abfsConfig.getAccountName().indexOf(DOT)),
@@ -765,6 +773,81 @@ public void testExpectHundredContinue() throws Exception {
             .isFalse();
   }
 
+  /**
+   * Parameterized test to verify the correct setup of authentication providers
+   * for each supported AuthType in the Azure Blob FileSystem configuration.
+   * For each AuthType, this test checks that the expected provider(s) are 
present
+   * and that unsupported providers throw the correct exceptions.
+   *
+   * OAuth: Token provider must be present, SAS provider must throw exception.
+   * SharedKey: Token provider must throw exception, SAS provider must throw 
exception.
+   * SAS: SAS provider must be present, token provider must throw exception.
+   * UserboundSASWithOAuth: Both AccessTokenProvider and SASTokenProvider must 
be present.
+   * Custom: Test is skipped.
+   *
+   * @param authType the authentication type to test
+   * @throws Exception if any error occurs during test execution
+   */
+  @ParameterizedTest
+  @EnumSource(AuthType.class)
+  public void testAuthTypeProviderSetup(AuthType authType) throws Exception {
+    if (authType.name().equals("Custom")) {
+      return;
+    }
+
+    this.getConfiguration().set(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME, 
SharedKey.name());
+    AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(
+        getRawConfiguration());
+    this.getConfiguration().set(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME, 
authType.name());
+
+    AbfsConfiguration abfsConfig = fs.getAbfsStore().getAbfsConfiguration();
+
+    switch (authType) {
+    case OAuth:
+      assertNotNull(abfsConfig.getTokenProvider(),
+          "OAuth should have token provider");
+      assertThrows(AzureBlobFileSystemException.class,
+          () -> abfsConfig.getSASTokenProvider(),
+          "SharedKey should not have SAS provider");
+      break;
+
+    case SharedKey:
+      assertThrows(TokenAccessProviderException.class,
+          () -> abfsConfig.getTokenProvider(),
+          "SharedKey should not have token provider");
+      assertThrows(AzureBlobFileSystemException.class,
+          () -> abfsConfig.getSASTokenProvider(),
+          "SharedKey should not have SAS provider");
+      break;
+
+    case SAS:
+      if (!abfsConfig.getIsNamespaceEnabledAccount().toBoolean()) {
+        assumeBlobServiceType();
+      }
+      assertThrows(TokenAccessProviderException.class,
+          () -> abfsConfig.getTokenProvider(),
+          "SharedKey should not have token provider");
+      assertNotNull(abfsConfig.getSASTokenProvider(),
+          "SAS should have SAS provider");
+      break;
+
+    case UserboundSASWithOAuth:
+      assumeHnsEnabled();
+      Object[] providers = abfsConfig.getUserBoundSASBothTokenProviders();
+      assertNotNull(providers, "Providers array must not be null");
+      assertTrue(providers[0] instanceof AccessTokenProvider,
+          "First should be AccessTokenProvider");
+      assertTrue(providers[1] instanceof SASTokenProvider,
+          "Second should be SASTokenProvider");
+      break;
+
+    default:
+      fail("Unexpected AuthType: " + authType);
+    }
+
+    fs.close();
+  }
+
   @Test
   public void testIsNonEmptyDirectory() throws IOException {
     testIsNonEmptyDirectoryInternal(EMPTY_STRING, true, EMPTY_STRING,
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsTailLatencyTracker.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsTailLatencyTracker.java
index 2e4a7516ed6..ac56af001c4 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsTailLatencyTracker.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsTailLatencyTracker.java
@@ -74,8 +74,8 @@ private void assertTailLatencyTimeoutEnabled(Configuration 
conf, boolean expecte
       Path testPath = new Path("/testFile");
       fs.create(testPath).close();
       AbfsClient client = fs.getAbfsStore().getClient();
-      AbfsRestOperation op = client.getAclStatus("/testFile",
-          getTestTracingContext(fs, false));
+      AbfsRestOperation op = client.getPathStatus("/testFile", false,
+          getTestTracingContext(fs, false), null);
       assertThat(op.isTailLatencyTimeoutEnabled()).isEqualTo(expected);
     }
   }
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java
index fe481970e0b..039e01ed095 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java
@@ -70,6 +70,7 @@ public void testTimerInitializationWithoutMetricCollection() 
throws Exception {
                 abfsConfiguration,
                 (AccessTokenProvider) null,
                 null,
+                null,
                 abfsClientContext);
 
         assertThat(client.getTimer())
@@ -106,6 +107,7 @@ public void testTimerInitializationWithMetricCollection() 
throws Exception {
                 abfsConfiguration,
                 (AccessTokenProvider) null,
                 null,
+                null,
                 abfsClientContext);
 
         assertThat(client.getTimer())
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java
index aa6ce16373a..4a902a8147d 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java
@@ -919,8 +919,7 @@ public void 
testPrefetchReadAddsPriorityHeaderWithDifferentConfigs()
     configuration1.set(FS_AZURE_ENABLE_PREFETCH_REQUEST_PRIORITY, "true");
 
     Configuration configuration2 = new Configuration(getRawConfiguration());
-    //use the default value for the config: false
-    configuration2.unset(FS_AZURE_ENABLE_PREFETCH_REQUEST_PRIORITY);
+    configuration2.set(FS_AZURE_ENABLE_PREFETCH_REQUEST_PRIORITY, "false");
 
     TracingContext tracingContext1 = mock(TracingContext.class);
     when(tracingContext1.getReadType()).thenReturn(PREFETCH_READ);
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/DelegationSASGenerator.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/DelegationSASGeneratorVersionJuly5.java
similarity index 64%
rename from 
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/DelegationSASGenerator.java
rename to 
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/DelegationSASGeneratorVersionJuly5.java
index eec0d86f0b6..8694a8d4f53 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/DelegationSASGenerator.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/DelegationSASGeneratorVersionJuly5.java
@@ -19,37 +19,81 @@
 package org.apache.hadoop.fs.azurebfs.utils;
 
 import java.time.Instant;
+import java.util.Objects;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider;
 import org.apache.hadoop.fs.azurebfs.services.AbfsUriQueryBuilder;
 
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
 import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ROOT_PATH;
 
 /**
  * Test Delegation SAS generator.
  */
-public class DelegationSASGenerator extends SASGenerator {
+public class DelegationSASGeneratorVersionJuly5 extends SASGenerator {
   private final String skoid;
   private final String sktid;
   private final String skt;
   private final String ske;
   private final String sks = "b";
   private final String skv;
+  private final String skdutid;
+  private final String sduoid;
 
-  public DelegationSASGenerator(byte[] userDelegationKey, String skoid, String 
sktid, String skt, String ske, String skv) {
+  /**
+   * Initializes all the fields required for generating delegation SAS.
+   *
+   * @param userDelegationKey the user delegation key as a byte array
+   * @param skoid Azure AD delegator app's object ID
+   * @param sktid Azure AD delegator app's tenant ID
+   * @param skt The start time for the delegation key
+   * @param ske The expiry time for the delegation key
+   * @param skv the API version
+   * @param skdutid Azure AD delegated app's tenant ID
+   * @param sduoid Azure AD delegated app's user object ID
+   */
+  public DelegationSASGeneratorVersionJuly5(byte[] userDelegationKey,
+      String skoid,
+      String sktid,
+      String skt,
+      String ske,
+      String skv,
+      String skdutid,
+      String sduoid) {
     super(userDelegationKey);
     this.skoid = skoid;
     this.sktid = sktid;
     this.skt = skt;
     this.ske = ske;
     this.skv = skv;
+    this.skdutid = skdutid;
+    this.sduoid = sduoid;
   }
 
+  /**
+   * Generates a delegation SAS token for the specified resource and operation.
+   *
+   * @param accountName The storage account name
+   * @param containerName The container name
+   * @param path The path to the resource
+   * @param operation The operation to authorize
+   * @param saoid The Azure AD object ID of the application
+   * @param suoid The Azure AD object ID of the user
+   * @param scid The correlation ID
+   * @return The generated SAS token as a query string
+   * @throws IllegalArgumentException if the operation is not recognized
+   */
   public String getDelegationSAS(String accountName, String containerName, 
String path, String operation,
                                  String saoid, String suoid, String scid) {
 
-    final String sv = AuthenticationVersion.Feb20.toString();
+    // The params for signature computation (particularly the string-to-sign) 
are different based on the SAS version (sv)
+    // They might need to be changed if using a different version
+    //Ref: 
https://learn.microsoft.com/en-us/rest/api/storageservices/create-user-delegation-sas
+
+    // SAS version (sv) used here is 2025-07-05
+    final String sv = AuthenticationVersion.Jul5.toString();
+
     final String st = 
ISO_8601_FORMATTER.format(Instant.now().minus(FIVE_MINUTES));
     final String se = ISO_8601_FORMATTER.format(Instant.now().plus(ONE_DAY));
     String sr = "b";
@@ -117,6 +161,15 @@ public String getDelegationSAS(String accountName, String 
containerName, String
     qb.addQuery("ske", ske);
     qb.addQuery("sks", sks);
     qb.addQuery("skv", skv);
+
+    //skdutid and sduoid are required for user bound SAS only
+    if (!Objects.equals(skdutid, EMPTY_STRING)) {
+      qb.addQuery("skdutid", skdutid);
+    }
+    if (!Objects.equals(sduoid, EMPTY_STRING)) {
+      qb.addQuery("sduoid", sduoid);
+    }
+
     if (saoid != null) {
       qb.addQuery("saoid", saoid);
     }
@@ -138,6 +191,22 @@ public String getDelegationSAS(String accountName, String 
containerName, String
     return qb.toString().substring(1);
   }
 
+  /**
+   * Computes the signature for the SAS token based on the provided parameters.
+   *
+   * @param sp Signed permissions
+   * @param st Signed start time
+   * @param se Signed expiry time
+   * @param sv Signed version
+   * @param sr Signed resource
+   * @param accountName The storage account name
+   * @param containerName The container name
+   * @param path The path to the resource
+   * @param saoid The Azure AD object ID of the application
+   * @param suoid The Azure AD object ID of the user
+   * @param scid The correlation ID
+   * @return The computed HMAC256 signature
+   */
   private String computeSignatureForSAS(String sp, String st, String se, 
String sv,
       String sr, String accountName, String containerName,
       String path, String saoid, String suoid, String scid) {
@@ -183,17 +252,32 @@ private String computeSignatureForSAS(String sp, String 
st, String se, String sv
     }
     sb.append("\n");
 
+    // skdutid, sduoid are sent as empty strings for user-delegation SAS
+    // They are only required for user-bound SAS
+    if (!Objects.equals(skdutid, EMPTY_STRING)) {
+      sb.append(skdutid);
+    }
+    sb.append("\n");
+
+    if (!Objects.equals(sduoid, EMPTY_STRING)) {
+      sb.append(sduoid);
+    }
+    sb.append("\n");
+
+
     sb.append("\n"); // sip
     sb.append("\n"); // spr
     sb.append(sv);
     sb.append("\n");
     sb.append(sr);
     sb.append("\n");
+    sb.append("\n"); // - For optional : signedSnapshotTime
+    sb.append("\n"); // - For optional :signedEncryptionScope
     sb.append("\n"); // - For optional : rscc - ResponseCacheControl
     sb.append("\n"); // - For optional : rscd - ResponseContentDisposition
     sb.append("\n"); // - For optional : rsce - ResponseContentEncoding
     sb.append("\n"); // - For optional : rscl - ResponseContentLanguage
-    sb.append("\n"); // - For optional : rsct - ResponseContentType
+    //No escape sequence required for optional param rsct - ResponseContentType
 
     String stringToSign = sb.toString();
     LOG.debug("Delegation SAS stringToSign: " + stringToSign.replace("\n", 
"."));
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/SASGenerator.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/SASGenerator.java
index a80ddac5ed3..e3b6127e7cc 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/SASGenerator.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/SASGenerator.java
@@ -41,7 +41,8 @@ public abstract class SASGenerator {
   public enum AuthenticationVersion {
     Nov18("2018-11-09"),
     Dec19("2019-12-12"),
-    Feb20("2020-02-10");
+    Feb20("2020-02-10"),
+    Jul5("2025-07-05");
 
     private final String ver;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to