This is an automated email from the ASF dual-hosted git repository.
anujmodi pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new adda712dcbf HADOOP-19735. ABFS: Adding request priority for prefetches
(#8050)
adda712dcbf is described below
commit adda712dcbfdaed4ea4d88727c7684c972292bad
Author: manika137 <[email protected]>
AuthorDate: Tue Nov 4 23:09:45 2025 +0530
HADOOP-19735. ABFS: Adding request priority for prefetches (#8050)
Contributed by Manika Joshi
---
.../hadoop/fs/azurebfs/AbfsConfiguration.java | 17 ++++
.../fs/azurebfs/constants/ConfigurationKeys.java | 12 +++
.../constants/FileSystemConfigurations.java | 7 ++
.../constants/HttpHeaderConfigurations.java | 1 +
.../fs/azurebfs/services/AbfsBlobClient.java | 3 +
.../hadoop/fs/azurebfs/services/AbfsClient.java | 17 ++++
.../hadoop/fs/azurebfs/services/AbfsDfsClient.java | 4 +
.../hadoop/fs/azurebfs/utils/TracingContext.java | 9 ++
.../fs/azurebfs/services/TestAbfsInputStream.java | 97 ++++++++++++++++++++++
9 files changed, 167 insertions(+)
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
index 08f89171486..a54ee2bd444 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
@@ -638,6 +638,15 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_FS_AZURE_TAIL_LATENCY_MAX_RETRY_COUNT)
private int tailLatencyMaxRetryCount;
+ @BooleanConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_ENABLE_PREFETCH_REQUEST_PRIORITY,
+ DefaultValue = DEFAULT_FS_AZURE_ENABLE_PREFETCH_REQUEST_PRIORITY)
+ private boolean enablePrefetchRequestPriority;
+
+ @IntegerConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_PREFETCH_REQUEST_PRIORITY_VALUE,
+ MinValue = DEFAULT_FS_AZURE_STANDARD_REQUEST_PRIORITY_VALUE,
+ DefaultValue = DEFAULT_FS_AZURE_LOWEST_REQUEST_PRIORITY_VALUE)
+ private int prefetchRequestPriorityValue;
+
private String clientProvidedEncryptionKey;
private String clientProvidedEncryptionKeySHA;
@@ -1355,6 +1364,14 @@ public boolean getIsCreateIdempotencyEnabled() {
return enableCreateIdempotency;
}
+ public boolean isEnablePrefetchRequestPriority() {
+ return enablePrefetchRequestPriority;
+ }
+
+ public String getPrefetchRequestPriorityValue() {
+ return Integer.toString(prefetchRequestPriorityValue);
+ }
+
/**
* Enum config to allow user to pick format of x-ms-client-request-id header
* @return tracingContextFormat config if valid, else default ALL_ID_FORMAT
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
index e21dd7e5a05..91fc97e1b7c 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
@@ -282,6 +282,18 @@ public final class ConfigurationKeys {
*/
public static final String FS_AZURE_ENABLE_READAHEAD_V2_DYNAMIC_SCALING =
"fs.azure.enable.readahead.v2.dynamic.scaling";
+ /**
+ * Enable or disable request priority for prefetch requests
+ * Value: {@value}.
+ */
+ public static final String FS_AZURE_ENABLE_PREFETCH_REQUEST_PRIORITY =
"fs.azure.enable.prefetch.request.priority";
+
+ /**
+ * Request priority value for prefetch requests
+ * Value: {@value}.
+ */
+ public static final String FS_AZURE_PREFETCH_REQUEST_PRIORITY_VALUE =
"fs.azure.prefetch.request.priority.value";
+
/**
* Minimum number of prefetch threads in the thread pool for readahead V2.
* {@value }
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
index 8a93de648e4..8c5bc2229cf 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
@@ -393,6 +393,13 @@ public final class FileSystemConfigurations {
public static final boolean DEFAULT_FS_AZURE_ENABLE_CREATE_BLOB_IDEMPOTENCY
= true;
+ public static final boolean
DEFAULT_FS_AZURE_ENABLE_PREFETCH_REQUEST_PRIORITY = true;
+
+ // The default traffic request priority is 3 (from service side)
+ // The lowest priority a request can get is 7
+ public static final int DEFAULT_FS_AZURE_LOWEST_REQUEST_PRIORITY_VALUE = 7;
+ public static final int DEFAULT_FS_AZURE_STANDARD_REQUEST_PRIORITY_VALUE = 3;
+
public static final boolean DEFAULT_FS_AZURE_ENABLE_TAIL_LATENCY_TRACKER =
false;
public static final boolean
DEFAULT_FS_AZURE_ENABLE_TAIL_LATENCY_REQUEST_TIMEOUT = false;
public static final int DEFAULT_FS_AZURE_TAIL_LATENCY_PERCENTILE = 99;
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java
index b442b1f8533..9521518fa1f 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java
@@ -69,6 +69,7 @@ public final class HttpHeaderConfigurations {
public static final String X_MS_LEASE_ACTION = "x-ms-lease-action";
public static final String X_MS_LEASE_DURATION = "x-ms-lease-duration";
public static final String X_MS_LEASE_ID = "x-ms-lease-id";
+ public static final String X_MS_REQUEST_PRIORITY = "x-ms-request-priority";
/**
* Http Request Header for denoting the lease id of source in copy operation.
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java
index d6ae0427b23..52ec53e4f13 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java
@@ -1335,6 +1335,9 @@ public AbfsRestOperation read(final String path,
requestHeaders.add(rangeHeader);
requestHeaders.add(new AbfsHttpHeader(IF_MATCH, eTag));
+ // Add request priority header for prefetch reads
+ addRequestPriorityForPrefetch(requestHeaders, tracingContext);
+
// Add request header to fetch MD5 Hash of data returned by server.
if (isChecksumValidationEnabled(requestHeaders, rangeHeader,
bufferLength)) {
requestHeaders.add(new AbfsHttpHeader(X_MS_RANGE_GET_CONTENT_MD5, TRUE));
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
index 33edc8dc06c..4d9ceee9e3f 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
@@ -62,6 +62,7 @@
import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
import org.apache.hadoop.fs.azurebfs.constants.HttpOperationType;
+import org.apache.hadoop.fs.azurebfs.constants.ReadType;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsDriverException;
import
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsInvalidChecksumException;
import
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
@@ -139,6 +140,7 @@
import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.ACCEPT_CHARSET;
import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.CONTENT_MD5;
import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.CONTENT_TYPE;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_REQUEST_PRIORITY;
import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.USER_AGENT;
import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_ENCRYPTION_ALGORITHM;
import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_ENCRYPTION_CONTEXT;
@@ -1399,6 +1401,21 @@ protected void
addCheckSumHeaderForWrite(List<AbfsHttpHeader> requestHeaders,
}
}
+ /**
+ * Add request priority header for prefetch read requests if enabled.
+ *
+ * @param requestHeaders to be updated with request priority header
+ * @param tracingContext tracing context to check read type
+ */
+ protected void addRequestPriorityForPrefetch(List<AbfsHttpHeader>
requestHeaders,
+ TracingContext tracingContext) {
+ if (getAbfsConfiguration().isEnablePrefetchRequestPriority()
+ && ReadType.PREFETCH_READ.equals(tracingContext.getReadType())) {
+ requestHeaders.add(new AbfsHttpHeader(X_MS_REQUEST_PRIORITY,
+ getAbfsConfiguration().getPrefetchRequestPriorityValue()));
+ }
+ }
+
/**
* To verify the checksum information received from server for the data read.
* @param buffer stores the data received from server.
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java
index cd94ee1c8fd..08ef93d1008 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java
@@ -1072,6 +1072,10 @@ public AbfsRestOperation read(final String path,
}
final AbfsUriQueryBuilder abfsUriQueryBuilder =
createDefaultUriQueryBuilder();
+
+ // Add request priority header for prefetch reads
+ addRequestPriorityForPrefetch(requestHeaders, tracingContext);
+
// AbfsInputStream/AbfsOutputStream reuse SAS tokens for better performance
String sasTokenForReuse = appendSASTokenToQuery(path,
SASTokenProvider.READ_OPERATION,
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java
index 0179718a06e..62ac76fee99 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java
@@ -389,4 +389,13 @@ public void setReadType(ReadType readType) {
listener.updateReadType(readType);
}
}
+
+ /**
+ * Returns the read type for the current operation.
+ *
+ * @return the read type for the request.
+ */
+ public ReadType getReadType() {
+ return readType;
+ }
}
diff --git
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java
index 37d8046203e..aa6ce16373a 100644
---
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java
+++
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java
@@ -21,6 +21,8 @@
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
@@ -58,6 +60,8 @@
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COLON;
import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SPLIT_NO_LIMIT;
+import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_PREFETCH_REQUEST_PRIORITY;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_REQUEST_PRIORITY;
import static org.apache.hadoop.fs.azurebfs.constants.ReadType.DIRECT_READ;
import static org.apache.hadoop.fs.azurebfs.constants.ReadType.FOOTER_READ;
import static
org.apache.hadoop.fs.azurebfs.constants.ReadType.MISSEDCACHE_READ;
@@ -66,8 +70,12 @@
import static org.apache.hadoop.fs.azurebfs.constants.ReadType.SMALLFILE_READ;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.nullable;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
@@ -105,6 +113,7 @@ public class TestAbfsInputStream extends
private static final int OPERATION_INDEX = 6;
private static final int READTYPE_INDEX = 11;
+
@AfterEach
@Override
public void teardown() throws Exception {
@@ -899,6 +908,94 @@ private void
testReadTypeInTracingContextHeaderInternal(AzureBlobFileSystem fs,
assertReadTypeInClientRequestId(fs, numOfReadCalls, totalReadCalls,
readType);
}
+ /*
+ * Test to verify that both conditions of prefetch read and respective config
+ * enabled needs to be true for the priority header to be added
+ */
+ @Test
+ public void testPrefetchReadAddsPriorityHeaderWithDifferentConfigs()
+ throws Exception {
+ Configuration configuration1 = new Configuration(getRawConfiguration());
+ configuration1.set(FS_AZURE_ENABLE_PREFETCH_REQUEST_PRIORITY, "true");
+
+ Configuration configuration2 = new Configuration(getRawConfiguration());
+ //use the default value for the config: false
+ configuration2.unset(FS_AZURE_ENABLE_PREFETCH_REQUEST_PRIORITY);
+
+ TracingContext tracingContext1 = mock(TracingContext.class);
+ when(tracingContext1.getReadType()).thenReturn(PREFETCH_READ);
+
+ //Prefetch Read with config enabled
+ executePrefetchReadTest(tracingContext1, configuration1, true);
+ //Prefetch Read with config disabled
+ executePrefetchReadTest(tracingContext1, configuration2, false);
+
+ when(tracingContext1.getReadType()).thenReturn(DIRECT_READ);
+
+ //Non-prefetch read with config disabled
+ executePrefetchReadTest(tracingContext1, configuration2, false);
+ //Non-prefetch read with config enabled
+ executePrefetchReadTest(tracingContext1, configuration1, false);
+ }
+
+ /*
+ * Helper method to execute read and verify if priority header is added or
not as expected
+ */
+ private void executePrefetchReadTest(TracingContext tracingContext,
+ Configuration rawConfig,
+ boolean shouldHaveHeader) throws Exception {
+ try (AzureBlobFileSystem azureFs = (AzureBlobFileSystem)
FileSystem.newInstance(
+ rawConfig)) {
+ AzureBlobFileSystemStore store = Mockito.spy(azureFs.getAbfsStore());
+
+ AbfsClient abfsClient = Mockito.spy(store.getClient());
+ Mockito.doReturn(abfsClient).when(store).getClient();
+
+ List<AbfsHttpHeader> headersList = new ArrayList<>();
+
+ doAnswer(invocation -> {
+ AbfsRestOperation realOp
+ = (AbfsRestOperation) invocation.callRealMethod();
+ AbfsRestOperation spiedOp = spy(realOp);
+
+ headersList.addAll(spiedOp.getRequestHeaders());
+
+ doNothing().when(spiedOp).execute(any(TracingContext.class));
+ return spiedOp;
+ })
+ .when(abfsClient)
+ .getAbfsRestOperation(
+ any(AbfsRestOperationType.class),
+ anyString(),
+ any(URL.class),
+ anyList(),
+ any(byte[].class),
+ anyInt(),
+ anyInt(),
+ nullable(String.class)
+ );
+
+ abfsClient.read(
+ "dummy-path", 0L, new byte[1], 0, 1,
+ "etag", "leaseId", null, tracingContext);
+
+ AbfsConfiguration abfsConfig = store.getAbfsConfiguration();
+ if (shouldHaveHeader) {
+ assertThat(headersList)
+ .anySatisfy(header -> {
+ assertThat(header.getName()).isEqualTo(
+ X_MS_REQUEST_PRIORITY);
+ assertThat(header.getValue()).isEqualTo(
+ abfsConfig.getPrefetchRequestPriorityValue());
+ });
+ } else {
+ assertThat(headersList)
+ .noneSatisfy(header -> assertThat(header.getName()).isEqualTo(
+ X_MS_REQUEST_PRIORITY));
+ }
+ }
+ }
+
private Path createTestFile(AzureBlobFileSystem fs, int fileSize) throws
Exception {
Path testPath = new Path("testFile");
byte[] fileContent = getRandomBytesArray(fileSize);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]