This is an automated email from the ASF dual-hosted git repository.

anujmodi pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new adda712dcbf HADOOP-19735. ABFS: Adding request priority for prefetches 
(#8050)
adda712dcbf is described below

commit adda712dcbfdaed4ea4d88727c7684c972292bad
Author: manika137 <[email protected]>
AuthorDate: Tue Nov 4 23:09:45 2025 +0530

    HADOOP-19735. ABFS: Adding request priority for prefetches (#8050)
    
    Contributed by Manika Joshi
---
 .../hadoop/fs/azurebfs/AbfsConfiguration.java      | 17 ++++
 .../fs/azurebfs/constants/ConfigurationKeys.java   | 12 +++
 .../constants/FileSystemConfigurations.java        |  7 ++
 .../constants/HttpHeaderConfigurations.java        |  1 +
 .../fs/azurebfs/services/AbfsBlobClient.java       |  3 +
 .../hadoop/fs/azurebfs/services/AbfsClient.java    | 17 ++++
 .../hadoop/fs/azurebfs/services/AbfsDfsClient.java |  4 +
 .../hadoop/fs/azurebfs/utils/TracingContext.java   |  9 ++
 .../fs/azurebfs/services/TestAbfsInputStream.java  | 97 ++++++++++++++++++++++
 9 files changed, 167 insertions(+)

diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
index 08f89171486..a54ee2bd444 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
@@ -638,6 +638,15 @@ public class AbfsConfiguration{
       DefaultValue = DEFAULT_FS_AZURE_TAIL_LATENCY_MAX_RETRY_COUNT)
   private int tailLatencyMaxRetryCount;
 
+  @BooleanConfigurationValidatorAnnotation(ConfigurationKey = 
FS_AZURE_ENABLE_PREFETCH_REQUEST_PRIORITY,
+      DefaultValue = DEFAULT_FS_AZURE_ENABLE_PREFETCH_REQUEST_PRIORITY)
+  private boolean enablePrefetchRequestPriority;
+
+  @IntegerConfigurationValidatorAnnotation(ConfigurationKey = 
FS_AZURE_PREFETCH_REQUEST_PRIORITY_VALUE,
+      MinValue = DEFAULT_FS_AZURE_STANDARD_REQUEST_PRIORITY_VALUE,
+      DefaultValue = DEFAULT_FS_AZURE_LOWEST_REQUEST_PRIORITY_VALUE)
+  private int prefetchRequestPriorityValue;
+
   private String clientProvidedEncryptionKey;
   private String clientProvidedEncryptionKeySHA;
 
@@ -1355,6 +1364,14 @@ public boolean getIsCreateIdempotencyEnabled() {
     return enableCreateIdempotency;
   }
 
+  public boolean isEnablePrefetchRequestPriority() {
+    return enablePrefetchRequestPriority;
+  }
+
+  public String getPrefetchRequestPriorityValue() {
+    return Integer.toString(prefetchRequestPriorityValue);
+  }
+
   /**
    * Enum config to allow user to pick format of x-ms-client-request-id header
    * @return tracingContextFormat config if valid, else default ALL_ID_FORMAT
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
index e21dd7e5a05..91fc97e1b7c 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
@@ -282,6 +282,18 @@ public final class ConfigurationKeys {
    */
   public static final String FS_AZURE_ENABLE_READAHEAD_V2_DYNAMIC_SCALING = 
"fs.azure.enable.readahead.v2.dynamic.scaling";
 
+  /**
+   * Enable or disable request priority for prefetch requests
+   * Value: {@value}.
+   */
+  public static final String FS_AZURE_ENABLE_PREFETCH_REQUEST_PRIORITY = 
"fs.azure.enable.prefetch.request.priority";
+
+  /**
+   * Request priority value for prefetch requests
+   * Value: {@value}.
+   */
+  public static final String FS_AZURE_PREFETCH_REQUEST_PRIORITY_VALUE = 
"fs.azure.prefetch.request.priority.value";
+
   /**
    * Minimum number of prefetch threads in the thread pool for readahead V2.
    * {@value }
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
index 8a93de648e4..8c5bc2229cf 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
@@ -393,6 +393,13 @@ public final class FileSystemConfigurations {
 
   public static final boolean DEFAULT_FS_AZURE_ENABLE_CREATE_BLOB_IDEMPOTENCY 
= true;
 
+  public static final boolean 
DEFAULT_FS_AZURE_ENABLE_PREFETCH_REQUEST_PRIORITY = true;
+
+  // The default traffic request priority is 3 (from service side)
+  // The lowest priority a request can get is 7
+  public static final int DEFAULT_FS_AZURE_LOWEST_REQUEST_PRIORITY_VALUE = 7;
+  public static final int DEFAULT_FS_AZURE_STANDARD_REQUEST_PRIORITY_VALUE = 3;
+
   public static final boolean DEFAULT_FS_AZURE_ENABLE_TAIL_LATENCY_TRACKER = 
false;
   public static final boolean 
DEFAULT_FS_AZURE_ENABLE_TAIL_LATENCY_REQUEST_TIMEOUT = false;
   public static final int DEFAULT_FS_AZURE_TAIL_LATENCY_PERCENTILE = 99;
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java
index b442b1f8533..9521518fa1f 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java
@@ -69,6 +69,7 @@ public final class HttpHeaderConfigurations {
   public static final String X_MS_LEASE_ACTION = "x-ms-lease-action";
   public static final String X_MS_LEASE_DURATION = "x-ms-lease-duration";
   public static final String X_MS_LEASE_ID = "x-ms-lease-id";
+  public static final String X_MS_REQUEST_PRIORITY = "x-ms-request-priority";
 
   /**
    * Http Request Header for denoting the lease id of source in copy operation.
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java
index d6ae0427b23..52ec53e4f13 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java
@@ -1335,6 +1335,9 @@ public AbfsRestOperation read(final String path,
     requestHeaders.add(rangeHeader);
     requestHeaders.add(new AbfsHttpHeader(IF_MATCH, eTag));
 
+    // Add request priority header for prefetch reads
+    addRequestPriorityForPrefetch(requestHeaders, tracingContext);
+
     // Add request header to fetch MD5 Hash of data returned by server.
     if (isChecksumValidationEnabled(requestHeaders, rangeHeader, 
bufferLength)) {
       requestHeaders.add(new AbfsHttpHeader(X_MS_RANGE_GET_CONTENT_MD5, TRUE));
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
index 33edc8dc06c..4d9ceee9e3f 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
@@ -62,6 +62,7 @@
 import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
 import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
 import org.apache.hadoop.fs.azurebfs.constants.HttpOperationType;
+import org.apache.hadoop.fs.azurebfs.constants.ReadType;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsDriverException;
 import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsInvalidChecksumException;
 import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
@@ -139,6 +140,7 @@
 import static 
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.ACCEPT_CHARSET;
 import static 
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.CONTENT_MD5;
 import static 
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.CONTENT_TYPE;
+import static 
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_REQUEST_PRIORITY;
 import static 
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.USER_AGENT;
 import static 
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_ENCRYPTION_ALGORITHM;
 import static 
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_ENCRYPTION_CONTEXT;
@@ -1399,6 +1401,21 @@ protected void 
addCheckSumHeaderForWrite(List<AbfsHttpHeader> requestHeaders,
     }
   }
 
+  /**
+   * Add request priority header for prefetch read requests if enabled.
+   *
+   * @param requestHeaders to be updated with request priority header
+   * @param tracingContext tracing context to check read type
+   */
+  protected void addRequestPriorityForPrefetch(List<AbfsHttpHeader> 
requestHeaders,
+      TracingContext tracingContext) {
+    if (getAbfsConfiguration().isEnablePrefetchRequestPriority()
+        && ReadType.PREFETCH_READ.equals(tracingContext.getReadType())) {
+      requestHeaders.add(new AbfsHttpHeader(X_MS_REQUEST_PRIORITY,
+          getAbfsConfiguration().getPrefetchRequestPriorityValue()));
+    }
+  }
+
   /**
    * To verify the checksum information received from server for the data read.
    * @param buffer stores the data received from server.
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java
index cd94ee1c8fd..08ef93d1008 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java
@@ -1072,6 +1072,10 @@ public AbfsRestOperation read(final String path,
     }
 
     final AbfsUriQueryBuilder abfsUriQueryBuilder = 
createDefaultUriQueryBuilder();
+
+    // Add request priority header for prefetch reads
+    addRequestPriorityForPrefetch(requestHeaders, tracingContext);
+
     // AbfsInputStream/AbfsOutputStream reuse SAS tokens for better performance
     String sasTokenForReuse = appendSASTokenToQuery(path,
         SASTokenProvider.READ_OPERATION,
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java
index 0179718a06e..62ac76fee99 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java
@@ -389,4 +389,13 @@ public void setReadType(ReadType readType) {
       listener.updateReadType(readType);
     }
   }
+
+  /**
+   * Returns the read type for the current operation.
+   *
+   *  @return the read type for the request.
+   */
+  public ReadType getReadType() {
+    return readType;
+  }
 }
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java
index 37d8046203e..aa6ce16373a 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java
@@ -21,6 +21,8 @@
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Optional;
@@ -58,6 +60,8 @@
 import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COLON;
 import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
 import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SPLIT_NO_LIMIT;
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_PREFETCH_REQUEST_PRIORITY;
+import static 
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_REQUEST_PRIORITY;
 import static org.apache.hadoop.fs.azurebfs.constants.ReadType.DIRECT_READ;
 import static org.apache.hadoop.fs.azurebfs.constants.ReadType.FOOTER_READ;
 import static 
org.apache.hadoop.fs.azurebfs.constants.ReadType.MISSEDCACHE_READ;
@@ -66,8 +70,12 @@
 import static org.apache.hadoop.fs.azurebfs.constants.ReadType.SMALLFILE_READ;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyList;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.nullable;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
@@ -105,6 +113,7 @@ public class TestAbfsInputStream extends
   private static final int OPERATION_INDEX = 6;
   private static final int READTYPE_INDEX = 11;
 
+
   @AfterEach
   @Override
   public void teardown() throws Exception {
@@ -899,6 +908,94 @@ private void 
testReadTypeInTracingContextHeaderInternal(AzureBlobFileSystem fs,
     assertReadTypeInClientRequestId(fs, numOfReadCalls, totalReadCalls, 
readType);
   }
 
+  /*
+   * Test to verify that both conditions of prefetch read and respective config
+   * enabled needs to be true for the priority header to be added
+   */
+  @Test
+  public void testPrefetchReadAddsPriorityHeaderWithDifferentConfigs()
+      throws Exception {
+    Configuration configuration1 = new Configuration(getRawConfiguration());
+    configuration1.set(FS_AZURE_ENABLE_PREFETCH_REQUEST_PRIORITY, "true");
+
+    Configuration configuration2 = new Configuration(getRawConfiguration());
+    //use the default value for the config: false
+    configuration2.unset(FS_AZURE_ENABLE_PREFETCH_REQUEST_PRIORITY);
+
+    TracingContext tracingContext1 = mock(TracingContext.class);
+    when(tracingContext1.getReadType()).thenReturn(PREFETCH_READ);
+
+    //Prefetch Read with config enabled
+    executePrefetchReadTest(tracingContext1, configuration1, true);
+    //Prefetch Read with config disabled
+    executePrefetchReadTest(tracingContext1, configuration2, false);
+
+    when(tracingContext1.getReadType()).thenReturn(DIRECT_READ);
+
+    //Non-prefetch read with config disabled
+    executePrefetchReadTest(tracingContext1, configuration2, false);
+    //Non-prefetch read with config enabled
+    executePrefetchReadTest(tracingContext1, configuration1, false);
+  }
+
+  /*
+   * Helper method to execute read and verify if priority header is added or 
not as expected
+   */
+  private void executePrefetchReadTest(TracingContext tracingContext,
+      Configuration rawConfig,
+      boolean shouldHaveHeader) throws Exception {
+    try (AzureBlobFileSystem azureFs = (AzureBlobFileSystem) 
FileSystem.newInstance(
+        rawConfig)) {
+      AzureBlobFileSystemStore store = Mockito.spy(azureFs.getAbfsStore());
+
+      AbfsClient abfsClient = Mockito.spy(store.getClient());
+      Mockito.doReturn(abfsClient).when(store).getClient();
+
+      List<AbfsHttpHeader> headersList = new ArrayList<>();
+
+      doAnswer(invocation -> {
+        AbfsRestOperation realOp
+            = (AbfsRestOperation) invocation.callRealMethod();
+        AbfsRestOperation spiedOp = spy(realOp);
+
+        headersList.addAll(spiedOp.getRequestHeaders());
+
+        doNothing().when(spiedOp).execute(any(TracingContext.class));
+        return spiedOp;
+      })
+          .when(abfsClient)
+          .getAbfsRestOperation(
+              any(AbfsRestOperationType.class),
+              anyString(),
+              any(URL.class),
+              anyList(),
+              any(byte[].class),
+              anyInt(),
+              anyInt(),
+              nullable(String.class)
+          );
+
+      abfsClient.read(
+          "dummy-path", 0L, new byte[1], 0, 1,
+          "etag", "leaseId", null, tracingContext);
+
+      AbfsConfiguration abfsConfig = store.getAbfsConfiguration();
+      if (shouldHaveHeader) {
+        assertThat(headersList)
+            .anySatisfy(header -> {
+              assertThat(header.getName()).isEqualTo(
+                  X_MS_REQUEST_PRIORITY);
+              assertThat(header.getValue()).isEqualTo(
+                  abfsConfig.getPrefetchRequestPriorityValue());
+            });
+      } else {
+        assertThat(headersList)
+            .noneSatisfy(header -> assertThat(header.getName()).isEqualTo(
+                X_MS_REQUEST_PRIORITY));
+      }
+    }
+  }
+
   private Path createTestFile(AzureBlobFileSystem fs, int fileSize) throws 
Exception {
     Path testPath = new Path("testFile");
     byte[] fileContent = getRandomBytesArray(fileSize);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to