This is an automated email from the ASF dual-hosted git repository.

anujmodi pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 99d091f2d9c HADOOP-19767: [ABFS] Introduce Abfs Input Policy for 
detecting read patterns (#8153)
99d091f2d9c is described below

commit 99d091f2d9c846d29386ce767939456aa811fbc3
Author: Anuj Modi <[email protected]>
AuthorDate: Fri Jan 23 17:04:46 2026 +0530

    HADOOP-19767: [ABFS] Introduce Abfs Input Policy for detecting read 
patterns (#8153)
---
 .../hadoop/fs/azurebfs/AbfsConfiguration.java      |  21 ++
 .../fs/azurebfs/AzureBlobFileSystemStore.java      |  39 ++-
 .../fs/azurebfs/constants/ConfigurationKeys.java   |   7 +
 .../constants/FileSystemConfigurations.java        |   2 +
 .../hadoop/fs/azurebfs/constants/ReadType.java     |   4 +
 .../exceptions/HttpResponseException.java          |  10 +
 .../azurebfs/services/AbfsAdaptiveInputStream.java | 117 +++++++++
 .../fs/azurebfs/services/AbfsInputStream.java      | 267 ++++++++++++++++-----
 .../hadoop/fs/azurebfs/services/AbfsLease.java     |   7 +-
 .../azurebfs/services/AbfsPrefetchInputStream.java | 100 ++++++++
 .../azurebfs/services/AbfsRandomInputStream.java   | 105 ++++++++
 .../fs/azurebfs/services/AbfsReadPolicy.java       |  78 ++++++
 .../fs/azurebfs/services/AbfsRetryPolicy.java      |   5 +
 .../fs/azurebfs/services/ReadBufferManager.java    |  19 +-
 .../fs/azurebfs/services/ReadBufferManagerV1.java  |  17 ++
 .../fs/azurebfs/services/ReadBufferManagerV2.java  |  75 ++++--
 .../azurebfs/ITestAbfsInputStreamStatistics.java   |   3 +-
 .../fs/azurebfs/services/ITestAbfsInputStream.java |   4 +-
 .../fs/azurebfs/services/TestAbfsInputStream.java  | 142 ++++++++++-
 .../azurebfs/services/TestReadBufferManagerV2.java |   2 +
 20 files changed, 901 insertions(+), 123 deletions(-)

diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
index e9dd94ff4ed..6c5929d2b90 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
@@ -679,6 +679,10 @@ public class AbfsConfiguration{
       DefaultValue = DEFAULT_FS_AZURE_LOWEST_REQUEST_PRIORITY_VALUE)
   private int prefetchRequestPriorityValue;
 
+  @StringConfigurationValidatorAnnotation(ConfigurationKey = 
FS_AZURE_READ_POLICY,
+          DefaultValue = DEFAULT_AZURE_READ_POLICY)
+  private String abfsReadPolicy;
+
   private String clientProvidedEncryptionKey;
   private String clientProvidedEncryptionKeySHA;
 
@@ -1433,6 +1437,14 @@ public String getPrefetchRequestPriorityValue() {
     return Integer.toString(prefetchRequestPriorityValue);
   }
 
+  /**
+   * Get the ABFS read policy set by user.
+   * @return the ABFS read policy.
+   */
+  public String getAbfsReadPolicy() {
+    return abfsReadPolicy;
+  }
+
   /**
    * Enum config to allow user to pick format of x-ms-client-request-id header
    * @return tracingContextFormat config if valid, else default ALL_ID_FORMAT
@@ -2139,6 +2151,15 @@ public void setIsChecksumValidationEnabled(boolean 
isChecksumValidationEnabled)
     this.isChecksumValidationEnabled = isChecksumValidationEnabled;
   }
 
+  /**
+   * Sets the ABFS read policy for testing purposes.
+   * @param readPolicy the read policy to set.
+   */
+  @VisibleForTesting
+  public void setAbfsReadPolicy(String readPolicy) {
+    abfsReadPolicy = readPolicy;
+  }
+
   public boolean isFullBlobChecksumValidationEnabled() {
     return isFullBlobChecksumValidationEnabled;
   }
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
index de4bc79d55a..6ec5c51eb1d 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
@@ -77,7 +77,6 @@
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException;
 import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.TrileanConversionException;
 import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
-import org.apache.hadoop.fs.azurebfs.services.ListResponseData;
 import org.apache.hadoop.fs.azurebfs.enums.Trilean;
 import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider;
 import org.apache.hadoop.fs.azurebfs.extensions.ExtensionHelper;
@@ -90,6 +89,7 @@
 import org.apache.hadoop.fs.azurebfs.security.ContextProviderEncryptionAdapter;
 import org.apache.hadoop.fs.azurebfs.security.NoContextEncryptionAdapter;
 import org.apache.hadoop.fs.azurebfs.services.AbfsAclHelper;
+import org.apache.hadoop.fs.azurebfs.services.AbfsAdaptiveInputStream;
 import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
 import org.apache.hadoop.fs.azurebfs.services.AbfsClientContext;
 import org.apache.hadoop.fs.azurebfs.services.AbfsClientContextBuilder;
@@ -97,6 +97,7 @@
 import org.apache.hadoop.fs.azurebfs.services.AbfsClientRenameResult;
 import org.apache.hadoop.fs.azurebfs.services.AbfsCounters;
 import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
+import org.apache.hadoop.fs.azurebfs.services.AbfsReadPolicy;
 import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream;
 import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamContext;
 import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamStatisticsImpl;
@@ -107,10 +108,13 @@
 import org.apache.hadoop.fs.azurebfs.services.AbfsPerfInfo;
 import org.apache.hadoop.fs.azurebfs.services.AbfsPerfTracker;
 import org.apache.hadoop.fs.azurebfs.services.AbfsPermission;
+import org.apache.hadoop.fs.azurebfs.services.AbfsPrefetchInputStream;
+import org.apache.hadoop.fs.azurebfs.services.AbfsRandomInputStream;
 import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
 import org.apache.hadoop.fs.azurebfs.services.AuthType;
 import org.apache.hadoop.fs.azurebfs.services.ExponentialRetryPolicy;
 import org.apache.hadoop.fs.azurebfs.services.ListingSupport;
+import org.apache.hadoop.fs.azurebfs.services.ListResponseData;
 import org.apache.hadoop.fs.azurebfs.services.SharedKeyCredentials;
 import org.apache.hadoop.fs.azurebfs.services.StaticRetryPolicy;
 import 
org.apache.hadoop.fs.azurebfs.services.TailLatencyRequestTimeoutRetryPolicy;
@@ -950,8 +954,37 @@ public AbfsInputStream openFileForRead(Path path,
 
       perfInfo.registerSuccess(true);
 
-      // Add statistics for InputStream
-      return new AbfsInputStream(getClient(), statistics, relativePath,
+      return getRelevantInputStream(statistics, relativePath, contentLength,
+          parameters, contextEncryptionAdapter, eTag, tracingContext);
+    }
+  }
+
+  private AbfsInputStream getRelevantInputStream(final FileSystem.Statistics 
statistics,
+      final String relativePath,
+      final long contentLength,
+      final Optional<OpenFileParameters> parameters,
+      final ContextEncryptionAdapter contextEncryptionAdapter,
+      final String eTag,
+      TracingContext tracingContext) {
+    AbfsReadPolicy inputPolicy = 
AbfsReadPolicy.getAbfsReadPolicy(getAbfsConfiguration().getAbfsReadPolicy());
+    switch (inputPolicy) {
+    case SEQUENTIAL:
+      return new AbfsPrefetchInputStream(getClient(), statistics, relativePath,
+          contentLength, populateAbfsInputStreamContext(
+          parameters.map(OpenFileParameters::getOptions),
+          contextEncryptionAdapter),
+          eTag, tracingContext);
+
+    case RANDOM:
+      return new AbfsRandomInputStream(getClient(), statistics, relativePath,
+          contentLength, populateAbfsInputStreamContext(
+          parameters.map(OpenFileParameters::getOptions),
+          contextEncryptionAdapter),
+          eTag, tracingContext);
+
+    case ADAPTIVE:
+    default:
+      return new AbfsAdaptiveInputStream(getClient(), statistics, relativePath,
           contentLength, populateAbfsInputStreamContext(
           parameters.map(OpenFileParameters::getOptions),
           contextEncryptionAdapter),
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
index c5eb9235fbb..115568c7853 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
@@ -22,6 +22,7 @@
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Options.OpenFileOptions;
 
 import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DOT;
 
@@ -266,6 +267,12 @@ public final class ConfigurationKeys {
   public static final String FS_AZURE_READ_AHEAD_QUEUE_DEPTH = 
"fs.azure.readaheadqueue.depth";
   public static final String FS_AZURE_ALWAYS_READ_BUFFER_SIZE = 
"fs.azure.read.alwaysReadBufferSize";
   public static final String FS_AZURE_READ_AHEAD_BLOCK_SIZE = 
"fs.azure.read.readahead.blocksize";
+  /**
+   * Provides hint for the read workload pattern.
+   * Possible Values Exposed in {@link 
OpenFileOptions#FS_OPTION_OPENFILE_READ_POLICIES}
+   */
+  public static final String FS_AZURE_READ_POLICY = "fs.azure.read.policy";
+
   /** Provides a config control to enable or disable ABFS Flush operations -
    *  HFlush and HSync. Default is true. **/
   public static final String FS_AZURE_ENABLE_FLUSH = "fs.azure.enable.flush";
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
index 6f76f2e033c..a7717c124db 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
@@ -22,6 +22,7 @@
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
 
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE;
 import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
 
 /**
@@ -108,6 +109,7 @@ public final class FileSystemConfigurations {
   public static final long MAX_AZURE_BLOCK_SIZE = 256 * 1024 * 1024L; // 
changing default abfs blocksize to 256MB
   public static final String AZURE_BLOCK_LOCATION_HOST_DEFAULT = "localhost";
   public static final int DEFAULT_AZURE_LIST_MAX_RESULTS = 5000;
+  public static final String DEFAULT_AZURE_READ_POLICY = 
FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE;
 
   public static final String SERVER_SIDE_ENCRYPTION_ALGORITHM = "AES256";
 
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ReadType.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ReadType.java
index 332a5a5ac56..51391cc7477 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ReadType.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ReadType.java
@@ -48,6 +48,10 @@ public enum ReadType {
    * Only triggered when small file read optimization kicks in.
    */
   SMALLFILE_READ("SR"),
+  /**
+   * Reads from Random Input Stream with read ahead up to readAheadRange
+   */
+  RANDOM_READ("RR"),
   /**
    * None of the above read types were applicable.
    */
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/HttpResponseException.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/HttpResponseException.java
index c257309c8c9..76126049f07 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/HttpResponseException.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/HttpResponseException.java
@@ -28,12 +28,22 @@
  */
 public class HttpResponseException extends IOException {
   private final HttpResponse httpResponse;
+
+  /**
+   * Constructor for HttpResponseException.
+   * @param s the exception message
+   * @param httpResponse the HttpResponse object
+   */
   public HttpResponseException(final String s, final HttpResponse 
httpResponse) {
     super(s);
     Objects.requireNonNull(httpResponse, "httpResponse should be non-null");
     this.httpResponse = httpResponse;
   }
 
+  /**
+   * Gets the HttpResponse associated with this exception.
+   * @return the HttpResponse
+   */
   public HttpResponse getHttpResponse() {
     return httpResponse;
   }
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAdaptiveInputStream.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAdaptiveInputStream.java
new file mode 100644
index 00000000000..25b4529aa08
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAdaptiveInputStream.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.azurebfs.constants.ReadType;
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+
+/**
+ * Input stream implementation optimized for adaptive read patterns.
+ * This is the default implementation used for cases where user does not 
specify any input policy.
+ * It switches between sequential and random read optimizations based on the 
detected read pattern.
+ * It also keeps footer read and small file optimizations enabled.
+ */
+public class AbfsAdaptiveInputStream extends AbfsInputStream {
+
+  /**
+   * Constructs AbfsAdaptiveInputStream instance.
+   * @param client to be used for read operations
+   * @param statistics to record input stream statistics
+   * @param path file path
+   * @param contentLength file content length
+   * @param abfsInputStreamContext input stream context
+   * @param eTag file eTag
+   * @param tracingContext tracing context to trace the read operations
+   */
+  public AbfsAdaptiveInputStream(
+      final AbfsClient client,
+      final FileSystem.Statistics statistics,
+      final String path,
+      final long contentLength,
+      final AbfsInputStreamContext abfsInputStreamContext,
+      final String eTag,
+      TracingContext tracingContext) {
+    super(client, statistics, path, contentLength,
+        abfsInputStreamContext, eTag, tracingContext);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  protected int readOneBlock(final byte[] b, final int off, final int len) 
throws IOException {
+    if (len == 0) {
+      return 0;
+    }
+    if (!validate(b, off, len)) {
+      return -1;
+    }
+    // If buffer is empty, then fill the buffer.
+    if (getBCursor() == getLimit()) {
+      // If EOF, then return -1
+      if (getFCursor() >= getContentLength()) {
+        return -1;
+      }
+
+      long bytesRead = 0;
+      // reset buffer to initial state - i.e., throw away existing data
+      setBCursor(0);
+      setLimit(0);
+      if (getBuffer() == null) {
+        LOG.debug("created new buffer size {}", getBufferSize());
+        setBuffer(new byte[getBufferSize()]);
+      }
+
+      // Reset Read Type back to normal and set again based on code flow.
+      getTracingContext().setReadType(ReadType.NORMAL_READ);
+      if (shouldAlwaysReadBufferSize()) {
+        bytesRead = readInternal(getFCursor(), getBuffer(), 0, 
getBufferSize(), false);
+      } else {
+        // Enable readAhead when reading sequentially
+        if (-1 == getFCursorAfterLastRead() || getFCursorAfterLastRead() == 
getFCursor() || b.length >= getBufferSize()) {
+          LOG.debug("Sequential read with read ahead size of {}", 
getBufferSize());
+          bytesRead = readInternal(getFCursor(), getBuffer(), 0, 
getBufferSize(), false);
+        } else {
+          /*
+           * Disable queuing prefetches when random read pattern detected.
+           * Instead, read ahead only for readAheadRange above what is asked 
by caller.
+           */
+          getTracingContext().setReadType(ReadType.RANDOM_READ);
+          int lengthWithReadAhead = Math.min(b.length + getReadAheadRange(), 
getBufferSize());
+          LOG.debug("Random read with read ahead size of {}", 
lengthWithReadAhead);
+          bytesRead = readInternal(getFCursor(), getBuffer(), 0, 
lengthWithReadAhead, true);
+        }
+      }
+      if (isFirstRead()) {
+        setFirstRead(false);
+      }
+      if (bytesRead == -1) {
+        return -1;
+      }
+
+      setLimit(getLimit() + (int) bytesRead);
+      setFCursor(getFCursor() + bytesRead);
+      setFCursorAfterLastRead(getFCursor());
+    }
+    return copyToUserBuffer(b, off, len);
+  }
+}
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
index 31b6f0f0739..3c009c71bcd 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
@@ -26,7 +26,6 @@
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.classification.VisibleForTesting;
-import org.apache.hadoop.fs.PositionedReadable;
 import org.apache.hadoop.fs.azurebfs.constants.ReadType;
 import org.apache.hadoop.fs.impl.BackReference;
 import org.apache.hadoop.util.Preconditions;
@@ -62,9 +61,9 @@
 /**
  * The AbfsInputStream for AbfsClient.
  */
-public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
+public abstract class AbfsInputStream extends FSInputStream implements 
CanUnbuffer,
         StreamCapabilities, IOStatisticsSource {
-  private static final Logger LOG = 
LoggerFactory.getLogger(AbfsInputStream.class);
+  protected static final Logger LOG = 
LoggerFactory.getLogger(AbfsInputStream.class);
   //  Footer size is set to qualify for both ORC and parquet files
   public static final int FOOTER_SIZE = 16 * ONE_KB;
   public static final int MAX_OPTIMIZED_READ_ATTEMPTS = 2;
@@ -73,6 +72,7 @@ public class AbfsInputStream extends FSInputStream implements 
CanUnbuffer,
   private final AbfsClient client;
   private final Statistics statistics;
   private final String path;
+
   private final long contentLength;
   private final int bufferSize; // default buffer size
   private final int footerReadSize; // default buffer size to read when 
reading footer
@@ -94,7 +94,8 @@ public class AbfsInputStream extends FSInputStream implements 
CanUnbuffer,
   // User configured size of read ahead.
   private final int readAheadRange;
 
-  private boolean firstRead = true;
+  private boolean firstRead = true; // to identify first read for optimizations
+
   // SAS tokens can be re-used until they expire
   private CachedSASToken cachedSasToken;
   private byte[] buffer = null;            // will be initialized on first use
@@ -134,6 +135,16 @@ public class AbfsInputStream extends FSInputStream 
implements CanUnbuffer,
   private final BackReference fsBackRef;
   private final ReadBufferManager readBufferManager;
 
+  /**
+   * Constructor for AbfsInputStream.
+   * @param client the ABFS client
+   * @param statistics the statistics
+   * @param path the file path
+   * @param contentLength the content length
+   * @param abfsInputStreamContext the input stream context
+   * @param eTag the eTag of the file
+   * @param tracingContext the tracing context
+   */
   public AbfsInputStream(
           final AbfsClient client,
           final Statistics statistics,
@@ -197,6 +208,10 @@ public AbfsInputStream(
     }
   }
 
+  /**
+   * Returns the path of file associated with this stream.
+   * @return the path of the file
+   */
   public String getPath() {
     return path;
   }
@@ -258,7 +273,7 @@ public synchronized int read(final byte[] b, final int off, 
final int len) throw
     // check if buffer is null before logging the length
     if (b != null) {
       LOG.debug("read requested b.length = {} offset = {} len = {}", b.length,
-          off, len);
+              off, len);
     } else {
       LOG.debug("read requested b = null offset = {} len = {}", off, len);
     }
@@ -327,58 +342,15 @@ private boolean shouldReadLastBlock() {
         && this.fCursor >= footerStart;
   }
 
-  private int readOneBlock(final byte[] b, final int off, final int len) 
throws IOException {
-    if (len == 0) {
-      return 0;
-    }
-    if (!validate(b, off, len)) {
-      return -1;
-    }
-    //If buffer is empty, then fill the buffer.
-    if (bCursor == limit) {
-      //If EOF, then return -1
-      if (fCursor >= contentLength) {
-        return -1;
-      }
-
-      long bytesRead = 0;
-      //reset buffer to initial state - i.e., throw away existing data
-      bCursor = 0;
-      limit = 0;
-      if (buffer == null) {
-        LOG.debug("created new buffer size {}", bufferSize);
-        buffer = new byte[bufferSize];
-      }
-
-      // Reset Read Type back to normal and set again based on code flow.
-      tracingContext.setReadType(ReadType.NORMAL_READ);
-      if (alwaysReadBufferSize) {
-        bytesRead = readInternal(fCursor, buffer, 0, bufferSize, false);
-      } else {
-        // Enable readAhead when reading sequentially
-        if (-1 == fCursorAfterLastRead || fCursorAfterLastRead == fCursor || 
b.length >= bufferSize) {
-          LOG.debug("Sequential read with read ahead size of {}", bufferSize);
-          bytesRead = readInternal(fCursor, buffer, 0, bufferSize, false);
-        } else {
-          // Enabling read ahead for random reads as well to reduce number of 
remote calls.
-          int lengthWithReadAhead = Math.min(b.length + readAheadRange, 
bufferSize);
-          LOG.debug("Random read with read ahead size of {}", 
lengthWithReadAhead);
-          bytesRead = readInternal(fCursor, buffer, 0, lengthWithReadAhead, 
true);
-        }
-      }
-      if (firstRead) {
-        firstRead = false;
-      }
-      if (bytesRead == -1) {
-        return -1;
-      }
-
-      limit += bytesRead;
-      fCursor += bytesRead;
-      fCursorAfterLastRead = fCursor;
-    }
-    return copyToUserBuffer(b, off, len);
-  }
+  /**
+   * Read one block of data into buffer.
+   * @param b buffer
+   * @param off offset
+   * @param len length
+   * @return number of bytes read
+   * @throws IOException if there is an error
+   */
+  protected abstract int readOneBlock(byte[] b, int off, int len) throws 
IOException;
 
   private int readFileCompletely(final byte[] b, final int off, final int len)
       throws IOException {
@@ -472,7 +444,15 @@ private void restorePointerState() {
     this.bCursor = this.bCursorBkp;
   }
 
-  private boolean validate(final byte[] b, final int off, final int len)
+  /**
+   * Validate the read parameters.
+   * @param b buffer byte array
+   * @param off offset in buffer
+   * @param len length to read
+   * @return true if valid else false
+   * @throws IOException if there is an error
+   */
+  protected boolean validate(final byte[] b, final int off, final int len)
       throws IOException {
     if (closed) {
       throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
@@ -492,7 +472,14 @@ private boolean validate(final byte[] b, final int off, 
final int len)
     return true;
   }
 
-  private int copyToUserBuffer(byte[] b, int off, int len){
+  /**
+   * Copy data from internal buffer to user buffer.
+   * @param b user buffer
+   * @param off offset
+   * @param len length
+   * @return number of bytes copied
+   */
+  protected int copyToUserBuffer(byte[] b, int off, int len){
     //If there is anything in the buffer, then return lesser of (requested 
bytes) and (bytes in buffer)
     //(bytes returned may be less than requested)
     int bytesRemaining = limit - bCursor;
@@ -511,7 +498,17 @@ private int copyToUserBuffer(byte[] b, int off, int len){
     return bytesToRead;
   }
 
-  private int readInternal(final long position, final byte[] b, final int 
offset, final int length,
+  /**
+   * Internal read method which handles read-ahead logic.
+   * @param position to read from
+   * @param b buffer
+   * @param offset in buffer
+   * @param length to read
+   * @param bypassReadAhead whether to bypass read-ahead
+   * @return number of bytes read
+   * @throws IOException if there is an error
+   */
+  protected int readInternal(final long position, final byte[] b, final int 
offset, final int length,
                            final boolean bypassReadAhead) throws IOException {
     if (isReadAheadEnabled() && !bypassReadAhead) {
       // try reading from read-ahead
@@ -728,6 +725,10 @@ public synchronized long getPos() throws IOException {
     return nextReadPos < 0 ? 0 : nextReadPos;
   }
 
+  /**
+   * Get the tracing context associated with this stream.
+   * @return the tracing context
+   */
   public TracingContext getTracingContext() {
     return tracingContext;
   }
@@ -797,10 +798,22 @@ public boolean hasCapability(String capability) {
     return StreamCapabilities.UNBUFFER.equals(toLowerCase(capability));
   }
 
-  byte[] getBuffer() {
+  /**
+   * Getter for buffer.
+   * @return the buffer
+   */
+  synchronized byte[] getBuffer() {
     return buffer;
   }
 
+  /**
+   * Setter for buffer.
+   * @param buffer the buffer to set
+   */
+  protected synchronized void setBuffer(byte[] buffer) {
+    this.buffer = buffer;
+  }
+
   /**
    * Checks if any version of read ahead is enabled.
    * If both are disabled, then skip read ahead logic.
@@ -811,21 +824,38 @@ public boolean isReadAheadEnabled() {
     return (readAheadEnabled || readAheadV2Enabled) && getReadBufferManager() 
!= null;
   }
 
+  /**
+   * Getter for user configured read ahead range.
+   * @return the read ahead range in int.
+   */
   @VisibleForTesting
   public int getReadAheadRange() {
     return readAheadRange;
   }
 
+  /**
+   * Setter for cachedSasToken.
+   * @param cachedSasToken the cachedSasToken to set
+   */
   @VisibleForTesting
   protected void setCachedSasToken(final CachedSASToken cachedSasToken) {
     this.cachedSasToken = cachedSasToken;
   }
 
+  /**
+   * Getter for inputStreamId.
+   * @return the inputStreamId
+   */
   @VisibleForTesting
   public String getStreamID() {
     return inputStreamId;
   }
 
+  /**
+   * Getter for eTag.
+   *
+   * @return the eTag
+   */
   public String getETag() {
     return eTag;
   }
@@ -840,6 +870,10 @@ public AbfsInputStreamStatistics getStreamStatistics() {
     return streamStatistics;
   }
 
+  /**
+   * Register a listener for this stream.
+   * @param listener1 the listener to register
+   */
   @VisibleForTesting
   public void registerListener(Listener listener1) {
     listener = listener1;
@@ -866,26 +900,46 @@ public long getBytesFromRemoteRead() {
     return bytesFromRemoteRead;
   }
 
+  /**
+   * Getter for buffer size.
+   * @return the buffer size
+   */
   @VisibleForTesting
   public int getBufferSize() {
     return bufferSize;
   }
 
+  /**
+   * Getter for footer read buffer size.
+   * @return the footer read buffer size
+   */
   @VisibleForTesting
   protected int getFooterReadBufferSize() {
     return footerReadSize;
   }
 
+  /**
+   * Getter for read ahead queue depth.
+   * @return the read ahead queue depth
+   */
   @VisibleForTesting
   public int getReadAheadQueueDepth() {
     return readAheadQueueDepth;
   }
 
+  /**
+   * Getter for alwaysReadBufferSize.
+   * @return the alwaysReadBufferSize
+   */
   @VisibleForTesting
   public boolean shouldAlwaysReadBufferSize() {
     return alwaysReadBufferSize;
   }
 
+  /**
+   * Get the IOStatistics for the stream.
+   * @return IOStatistics
+   */
   @Override
   public IOStatistics getIOStatistics() {
     return ioStatistics;
@@ -907,48 +961,131 @@ public String toString() {
     return sb.toString();
   }
 
+  /**
+   * Getter for bCursor.
+   * @return the bCursor
+   */
   @VisibleForTesting
-  int getBCursor() {
+  synchronized int getBCursor() {
     return this.bCursor;
   }
 
+  /**
+   * Setter for bCursor.
+   * @param bCursor the bCursor to set
+   */
+  protected synchronized void setBCursor(int bCursor) {
+    this.bCursor = bCursor;
+  }
+
+  /**
+   * Getter for fCursor.
+   * @return the fCursor
+   */
   @VisibleForTesting
-  long getFCursor() {
+  synchronized long getFCursor() {
     return this.fCursor;
   }
 
+  /**
+   * Setter for fCursor.
+   * @param fCursor the fCursor to set
+   */
+  protected synchronized void setFCursor(long fCursor) {
+    this.fCursor = fCursor;
+  }
+
+  /**
+   * Getter for fCursorAfterLastRead.
+   * @return the fCursorAfterLastRead
+   */
   @VisibleForTesting
-  long getFCursorAfterLastRead() {
+  synchronized long getFCursorAfterLastRead() {
     return this.fCursorAfterLastRead;
   }
 
+  /**
+   * Setter for fCursorAfterLastRead.
+   * @param fCursorAfterLastRead the fCursorAfterLastRead to set
+   */
+  protected synchronized void setFCursorAfterLastRead(long 
fCursorAfterLastRead) {
+    this.fCursorAfterLastRead = fCursorAfterLastRead;
+  }
+
+  /**
+   * Getter for limit.
+   * @return the limit
+   */
   @VisibleForTesting
-  long getLimit() {
+  synchronized int getLimit() {
     return this.limit;
   }
 
+  /**
+   * Setter for limit.
+   * @param limit the limit to set
+   */
+  protected synchronized void setLimit(int limit) {
+    this.limit = limit;
+  }
+
+  /**
+   * Getter for firstRead.
+   * @return the firstRead
+   */
   boolean isFirstRead() {
     return this.firstRead;
   }
 
+  /**
+   * Setter for firstRead.
+   * @param firstRead the firstRead to set
+   */
+  protected void setFirstRead(boolean firstRead) {
+    this.firstRead = firstRead;
+  }
+
+  /**
+   * Getter for fsBackRef.
+   * @return the fsBackRef
+   */
   @VisibleForTesting
   BackReference getFsBackRef() {
     return fsBackRef;
   }
 
+  /**
+   * Getter for readBufferManager.
+   * @return the readBufferManager
+   */
   @VisibleForTesting
   ReadBufferManager getReadBufferManager() {
     return readBufferManager;
   }
 
+  /**
+   * Minimum seek distance for vector reads.
+   * @return the minimum seek distance
+   */
   @Override
   public int minSeekForVectorReads() {
     return S_128K;
   }
 
+  /**
+   * Maximum read size for vector reads.
+   * @return the maximum read size
+   */
   @Override
   public int maxReadSizeForVectorReads() {
     return S_2M;
   }
 
+  /**
+   * Getter for contentLength.
+   * @return the contentLength
+   */
+  protected long getContentLength() {
+    return contentLength;
+  }
 }
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsLease.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsLease.java
index fa4b0ac209d..aab6f3d5510 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsLease.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsLease.java
@@ -22,6 +22,7 @@
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -70,7 +71,7 @@ public final class AbfsLease {
   private volatile boolean leaseFreed;
   private volatile String leaseID = null;
   private volatile Throwable exception = null;
-  private volatile int acquireRetryCount = 0;
+  private AtomicInteger acquireRetryCount = new AtomicInteger(0);
   private volatile ListenableScheduledFuture<AbfsRestOperation> future = null;
   private final long leaseRefreshDuration;
   private final int leaseRefreshDurationInSeconds;
@@ -197,7 +198,7 @@ public void onFailure(Throwable throwable) {
           if (RetryPolicy.RetryAction.RetryDecision.RETRY
               == retryPolicy.shouldRetry(null, numRetries, 0, true).action) {
             LOG.debug("Failed to acquire lease on {}, retrying: {}", path, 
throwable);
-            acquireRetryCount++;
+            acquireRetryCount.incrementAndGet();
             acquireLease(retryPolicy, numRetries + 1, retryInterval,
                 retryInterval, eTag, tracingContext);
           } else {
@@ -289,7 +290,7 @@ public String getLeaseID() {
    */
   @VisibleForTesting
   public int getAcquireRetryCount() {
-    return acquireRetryCount;
+    return acquireRetryCount.get();
   }
 
   /**
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsPrefetchInputStream.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsPrefetchInputStream.java
new file mode 100644
index 00000000000..c0343ca724e
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsPrefetchInputStream.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+
+/**
+ * Input stream implementation optimized for prefetching data.
+ * This implementation always prefetches data in advance if enabled
+ * to optimize for sequential read patterns.
+ */
+public class AbfsPrefetchInputStream extends AbfsInputStream {
+
+  /**
+   * Constructs AbfsPrefetchInputStream
+   * @param client AbfsClient to be used for read operations
+   * @param statistics to recordinput stream statistics
+   * @param path file path
+   * @param contentLength file content length
+   * @param abfsInputStreamContext input stream context
+   * @param eTag file eTag
+   * @param tracingContext tracing context to trace the read operations
+   */
+  public AbfsPrefetchInputStream(
+      final AbfsClient client,
+      final FileSystem.Statistics statistics,
+      final String path,
+      final long contentLength,
+      final AbfsInputStreamContext abfsInputStreamContext,
+      final String eTag,
+      TracingContext tracingContext) {
+    super(client, statistics, path, contentLength,
+            abfsInputStreamContext, eTag, tracingContext);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  protected int readOneBlock(final byte[] b, final int off, final int len) 
throws IOException {
+    if (len == 0) {
+      return 0;
+    }
+    if (!validate(b, off, len)) {
+      return -1;
+    }
+    // If buffer is empty, then fill the buffer.
+    if (getBCursor() == getLimit()) {
+      // If EOF, then return -1
+      if (getFCursor() >= getContentLength()) {
+        return -1;
+      }
+
+      long bytesRead = 0;
+      // reset buffer to initial state - i.e., throw away existing data
+      setBCursor(0);
+      setLimit(0);
+      if (getBuffer() == null) {
+        LOG.debug("created new buffer size {}", getBufferSize());
+        setBuffer(new byte[getBufferSize()]);
+      }
+
+      /*
+       * Always start with Prefetch even from first read.
+       * Even if out of order seek comes, prefetches will be triggered for 
next set of blocks.
+       */
+      bytesRead = readInternal(getFCursor(), getBuffer(), 0, getBufferSize(), 
false);
+      if (isFirstRead()) {
+        setFirstRead(false);
+      }
+      if (bytesRead == -1) {
+        return -1;
+      }
+
+      setLimit(getLimit() + (int) bytesRead);
+      setFCursor(getFCursor() + bytesRead);
+      setFCursorAfterLastRead(getFCursor());
+    }
+    return copyToUserBuffer(b, off, len);
+  }
+}
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRandomInputStream.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRandomInputStream.java
new file mode 100644
index 00000000000..b484cc6c843
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRandomInputStream.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.azurebfs.constants.ReadType;
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+
+/**
+ * Input stream implementation optimized for random read patterns.
+ * This implementation disables prefetching of data blocks instead only
+ * reads ahead for a small range beyond what is requested by the caller.
+ */
+public class AbfsRandomInputStream extends AbfsInputStream {
+
+  /**
+   * Constructs AbfsRandomInputStream
+   * @param client AbfsClient to be used for read operations
+   * @param statistics to record input stream statistics
+   * @param path file path
+   * @param contentLength file content length
+   * @param abfsInputStreamContext input stream context
+   * @param eTag file eTag
+   * @param tracingContext tracing context to trace the read operations
+   */
+  public AbfsRandomInputStream(
+      final AbfsClient client,
+      final FileSystem.Statistics statistics,
+      final String path,
+      final long contentLength,
+      final AbfsInputStreamContext abfsInputStreamContext,
+      final String eTag,
+      TracingContext tracingContext) {
+    super(client, statistics, path, contentLength,
+        abfsInputStreamContext, eTag, tracingContext);
+  }
+
+  /**
+   * inheritDoc
+   */
+  @Override
+  protected int readOneBlock(final byte[] b, final int off, final int len)
+      throws IOException {
+    if (len == 0) {
+      return 0;
+    }
+    if (!validate(b, off, len)) {
+      return -1;
+    }
+    // If buffer is empty, then fill the buffer.
+    if (getBCursor() == getLimit()) {
+      // If EOF, then return -1
+      if (getFCursor() >= getContentLength()) {
+        return -1;
+      }
+
+      long bytesRead = 0;
+      // reset buffer to initial state - i.e., throw away existing data
+      setBCursor(0);
+      setLimit(0);
+      if (getBuffer() == null) {
+        LOG.debug("created new buffer size {}", getBufferSize());
+        setBuffer(new byte[getBufferSize()]);
+      }
+
+      /*
+       * Disable queuing prefetches when random read pattern detected.
+       * Instead, read ahead only for readAheadRange above what is asked by 
caller.
+       */
+      getTracingContext().setReadType(ReadType.RANDOM_READ);
+      int lengthWithReadAhead = Math.min(b.length + getReadAheadRange(), 
getBufferSize());
+      LOG.debug("Random read with read ahead size of {}", lengthWithReadAhead);
+      bytesRead = readInternal(getFCursor(), getBuffer(), 0, 
lengthWithReadAhead, true);
+      if (isFirstRead()) {
+        setFirstRead(false);
+      }
+      if (bytesRead == -1) {
+        return -1;
+      }
+
+      setLimit(getLimit() + (int) bytesRead);
+      setFCursor(getFCursor() + bytesRead);
+      setFCursorAfterLastRead(getFCursor());
+    }
+    return copyToUserBuffer(b, off, len);
+  }
+}
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsReadPolicy.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsReadPolicy.java
new file mode 100644
index 00000000000..bdd895a39de
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsReadPolicy.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.util.Locale;
+
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_COLUMNAR;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_ORC;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_PARQUET;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_RANDOM;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
+
+/**
+ * Enum for ABFS Input Policies.
+ * Each policy maps to a particular implementation of {@link AbfsInputStream}
+ */
+public enum AbfsReadPolicy {
+
+  SEQUENTIAL(FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL),
+  RANDOM(FS_OPTION_OPENFILE_READ_POLICY_RANDOM),
+  ADAPTIVE(FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE);
+
+  private final String readPolicy;
+
+  AbfsReadPolicy(String readPolicy) {
+    this.readPolicy = readPolicy;
+  }
+
+  @Override
+  public String toString() {
+    return readPolicy;
+  }
+
+  /**
+   * Get the enum constant from the string name.
+   * @param name policy name as configured by user
+   * @return the corresponding AbsInputPolicy to be used
+   */
+  public static AbfsReadPolicy getAbfsReadPolicy(String name) {
+    String readPolicyStr = name.trim().toLowerCase(Locale.ENGLISH);
+    switch (readPolicyStr) {
+    // all these options currently map to random IO.
+    case FS_OPTION_OPENFILE_READ_POLICY_RANDOM:
+    case FS_OPTION_OPENFILE_READ_POLICY_COLUMNAR:
+    case FS_OPTION_OPENFILE_READ_POLICY_ORC:
+    case FS_OPTION_OPENFILE_READ_POLICY_PARQUET:
+      return RANDOM;
+
+    // handle the sequential formats.
+    case FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL:
+    case FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE:
+      return SEQUENTIAL;
+
+    // Everything else including ABFS Default Policy maps to Adaptive
+    case FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE:
+    default:
+      return ADAPTIVE;
+    }
+  }
+}
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRetryPolicy.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRetryPolicy.java
index f3e1e582f9d..7164e55e90c 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRetryPolicy.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRetryPolicy.java
@@ -41,6 +41,11 @@ public abstract class AbfsRetryPolicy {
    */
   private final String retryPolicyAbbreviation;
 
+  /**
+   * Constructor to initialize max retry count and abbreviation
+   * @param maxRetryCount maximum retry count
+   * @param retryPolicyAbbreviation abbreviation for retry policy
+   */
   protected AbfsRetryPolicy(final int maxRetryCount, final String 
retryPolicyAbbreviation) {
     this.maxRetryCount = maxRetryCount;
     this.retryPolicyAbbreviation = retryPolicyAbbreviation;
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
index 712b04fb499..5b53d641a20 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
@@ -23,7 +23,6 @@
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Queue;
-import java.util.Stack;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.slf4j.Logger;
@@ -45,7 +44,6 @@ public abstract class ReadBufferManager {
   private static int thresholdAgeMilliseconds;
   private static int blockSize = DEFAULT_READ_AHEAD_BLOCK_SIZE; // default 
block size for read-ahead in bytes
 
-  private Stack<Integer> freeList = new Stack<>();   // indices in buffers[] 
array that are available
   private Queue<ReadBuffer> readAheadQueue = new LinkedList<>(); // queue of 
requests that are not picked up by any worker thread yet
   private LinkedList<ReadBuffer> inProgressList = new LinkedList<>(); // 
requests being processed by worker threads
   private LinkedList<ReadBuffer> completedReadList = new LinkedList<>(); // 
buffers available for reading
@@ -200,15 +198,6 @@ protected static void setReadAheadBlockSize(int 
readAheadBlockSize) {
     blockSize = readAheadBlockSize;
   }
 
-  /**
-   * Gets the stack of free buffer indices.
-   *
-   * @return the stack of free buffer indices
-   */
-  Stack<Integer> getFreeList() {
-    return freeList;
-  }
-
   /**
    * Gets the queue of read-ahead requests.
    *
@@ -243,9 +232,7 @@ LinkedList<ReadBuffer> getCompletedReadList() {
    * @return a list of free buffer indices
    */
   @VisibleForTesting
-  List<Integer> getFreeListCopy() {
-    return new ArrayList<>(freeList);
-  }
+  abstract List<Integer> getFreeListCopy();
 
   /**
    * Gets a copy of the read-ahead queue.
@@ -294,7 +281,9 @@ int getCompletedReadListSize() {
    */
   @VisibleForTesting
   protected void testMimicFullUseAndAddFailedBuffer(ReadBuffer buf) {
-    freeList.clear();
+    clearFreeList();
     completedReadList.add(buf);
   }
+
+  abstract void clearFreeList();
 }
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java
index 240a6186666..c034d856596 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java
@@ -24,6 +24,8 @@
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.LinkedList;
+import java.util.List;
+import java.util.Stack;
 import java.util.concurrent.CountDownLatch;
 
 import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
@@ -42,6 +44,7 @@ public final class ReadBufferManagerV1 extends 
ReadBufferManager {
 
   private Thread[] threads = new Thread[NUM_THREADS];
   private byte[][] buffers;
+  private Stack<Integer> freeList = new Stack<>();   // indices in buffers[] 
array that are available
   private static ReadBufferManagerV1 bufferManager;
 
   // hide instance constructor
@@ -607,7 +610,21 @@ void resetBufferManager() {
     setBufferManager(null); // reset the singleton instance
   }
 
+  @Override
+  protected List<Integer> getFreeListCopy() {
+    return new ArrayList<>(freeList);
+  }
+
+  private Stack<Integer> getFreeList() {
+    return freeList;
+  }
+
   private static void setBufferManager(ReadBufferManagerV1 manager) {
     bufferManager = manager;
   }
+
+  @Override
+  protected void clearFreeList() {
+    getFreeList().clear();
+  }
 }
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java
index 7f276eb77d8..5cbe4893b12 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java
@@ -27,7 +27,7 @@
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Stack;
+import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -100,7 +100,12 @@ public final class ReadBufferManagerV2 extends 
ReadBufferManager {
 
   private byte[][] bufferPool;
 
-  private final Stack<Integer> removedBufferList = new Stack<>();
+  /*
+   * List of buffer indexes that are currently free and can be assigned to new 
read-ahead requests.
+   * Using a thread safe data structure as multiple threads can access this 
concurrently.
+   */
+  private final ConcurrentSkipListSet<Integer> removedBufferList = new 
ConcurrentSkipListSet<>();
+  private ConcurrentSkipListSet<Integer> freeList = new 
ConcurrentSkipListSet<>();
 
   private ScheduledExecutorService memoryMonitorThread;
 
@@ -209,7 +214,7 @@ void init() {
       // Start with just minimum number of buffers.
       bufferPool[i]
           = new byte[getReadAheadBlockSize()];  // same buffers are reused. 
The byte array never goes back to GC
-      getFreeList().add(i);
+      pushToFreeList(i);
       numberOfActiveBuffers.getAndIncrement();
     }
     memoryMonitorThread = Executors.newSingleThreadScheduledExecutor(
@@ -768,12 +773,17 @@ private synchronized boolean tryMemoryUpscale() {
     if (memoryLoad < memoryThreshold && getNumBuffers() < maxBufferPoolSize) {
       // Create and Add more buffers in getFreeList().
       int nextIndx = getNumBuffers();
-      if (removedBufferList.isEmpty() && nextIndx < bufferPool.length) {
+      if (removedBufferList.isEmpty()) {
+        if (nextIndx >= bufferPool.length) {
+          printTraceLog("Invalid next index: {}. Current buffer pool size: {}",
+              nextIndx, bufferPool.length);
+          return false;
+        }
         bufferPool[nextIndx] = new byte[getReadAheadBlockSize()];
         pushToFreeList(nextIndx);
       } else {
         // Reuse a removed buffer index.
-        int freeIndex = removedBufferList.pop();
+        int freeIndex = removedBufferList.pollFirst();
         if (freeIndex >= bufferPool.length || bufferPool[freeIndex] != null) {
           printTraceLog("Invalid free index: {}. Current buffer pool size: {}",
               freeIndex, bufferPool.length);
@@ -811,7 +821,7 @@ > getThresholdAgeMilliseconds()) {
     }
 
     double memoryLoad = ResourceUtilizationUtils.getMemoryLoad();
-    if (isDynamicScalingEnabled && memoryLoad > memoryThreshold) {
+    if (isDynamicScalingEnabled && memoryLoad > memoryThreshold && 
getNumBuffers() > minBufferPoolSize) {
       synchronized (this) {
         if (isFreeListEmpty()) {
           printTraceLog(
@@ -980,7 +990,7 @@ public void testResetReadBufferManager() {
       getReadAheadQueue().clear();
       getInProgressList().clear();
       getCompletedReadList().clear();
-      getFreeList().clear();
+      clearFreeList();
       for (int i = 0; i < maxBufferPoolSize; i++) {
         bufferPool[i] = null;
       }
@@ -1023,6 +1033,16 @@ void resetBufferManager() {
     setIsConfigured(false);
   }
 
+  @Override
+  protected List<Integer> getFreeListCopy() {
+    return new ArrayList<>(freeList);
+  }
+
+  @Override
+  protected void clearFreeList() {
+    freeList.clear();
+  }
+
   private static void setBufferManager(ReadBufferManagerV2 manager) {
     bufferManager = manager;
   }
@@ -1062,11 +1082,20 @@ public int getMinBufferPoolSize() {
     return minBufferPoolSize;
   }
 
+  @VisibleForTesting
+  public void setMinBufferPoolSize(int size) {
+    this.minBufferPoolSize = size;
+  }
+
   @VisibleForTesting
   public int getMaxBufferPoolSize() {
     return maxBufferPoolSize;
   }
 
+  /**
+   * Gets the maximum buffer pool size.
+   * @return size of the maximum buffer pool
+   */
   @VisibleForTesting
   public int getCurrentThreadPoolSize() {
     return workerRefs.size();
@@ -1082,6 +1111,10 @@ public int getMemoryMonitoringIntervalInMilliSec() {
     return memoryMonitoringIntervalInMilliSec;
   }
 
+  /**
+   * Returns the scheduled executor service used for CPU monitoring.
+   * @return the ScheduledExecutorService for CPU monitoring tasks
+   */
   @VisibleForTesting
   public ScheduledExecutorService getCpuMonitoringThread() {
     return cpuMonitorThread;
@@ -1098,6 +1131,13 @@ public long getMaxJvmCpuUtilization() {
     return maxJvmCpuUtilization;
   }
 
+  /**
+   * Calculates the required thread pool size based on the current
+   * read-ahead queue size and in-progress list size, applying a buffer
+   * to accommodate workload fluctuations.
+   *
+   * @return the calculated required thread pool size
+   */
   public int getRequiredThreadPoolSize() {
     return (int) Math.ceil(THREAD_POOL_REQUIREMENT_BUFFER
         * (getReadAheadQueue().size()
@@ -1105,30 +1145,15 @@ public int getRequiredThreadPoolSize() {
   }
 
   private boolean isFreeListEmpty() {
-    LOCK.lock();
-    try {
-      return getFreeList().isEmpty();
-    } finally {
-      LOCK.unlock();
-    }
+    return this.freeList.isEmpty();
   }
 
   private Integer popFromFreeList() {
-    LOCK.lock();
-    try {
-      return getFreeList().pop();
-    } finally {
-      LOCK.unlock();
-    }
+    return this.freeList.pollFirst();
   }
 
   private void pushToFreeList(int idx) {
-    LOCK.lock();
-    try {
-      getFreeList().push(idx);
-    } finally {
-      LOCK.unlock();
-    }
+    this.freeList.add(idx);
   }
 
   private void incrementActiveBufferCount() {
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java
index 6b87f1b73ef..c215094eca0 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java
@@ -26,6 +26,7 @@
 import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.services.AbfsAdaptiveInputStream;
 import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream;
 import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamContext;
 import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamStatisticsImpl;
@@ -275,7 +276,7 @@ public void testWithNullStreamStatistics() throws 
IOException {
               getTestTracingContext(fs, false), null);
 
       // AbfsInputStream with no StreamStatistics.
-      in = new AbfsInputStream(fs.getAbfsClient(), null,
+      in = new AbfsAdaptiveInputStream(fs.getAbfsClient(), null,
           nullStatFilePath.toUri().getPath(), ONE_KB, abfsInputStreamContext,
           abfsRestOperation.getResult().getResponseHeader("ETag"),
           getTestTracingContext(fs, false));
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java
index 938f5f4300c..2d8629294f6 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java
@@ -28,11 +28,11 @@
 import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
 import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
 
-import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.Test;
 
 import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB;
 import static 
org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamTestUtils.HUNDRED;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
@@ -117,7 +117,7 @@ public void 
testAzureBlobFileSystemBackReferenceInInputStream()
         FSDataInputStream in = getFileSystem().open(path)) {
       AbfsInputStream abfsInputStream = (AbfsInputStream) 
in.getWrappedStream();
 
-      Assertions.assertThat(abfsInputStream.getFsBackRef().isNull())
+      assertThat(abfsInputStream.getFsBackRef().isNull())
           .describedAs("BackReference in input stream should not be null")
           .isFalse();
     }
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java
index 93df6529cb8..5cf0bd473fc 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.fs.azurebfs.services;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URL;
@@ -57,6 +58,10 @@
 import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderVersion;
 import org.apache.hadoop.fs.impl.OpenFileParameters;
 
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_AVRO;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_PARQUET;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
 import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COLON;
 import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
 import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SPLIT_NO_LIMIT;
@@ -67,6 +72,7 @@
 import static 
org.apache.hadoop.fs.azurebfs.constants.ReadType.MISSEDCACHE_READ;
 import static org.apache.hadoop.fs.azurebfs.constants.ReadType.NORMAL_READ;
 import static org.apache.hadoop.fs.azurebfs.constants.ReadType.PREFETCH_READ;
+import static org.apache.hadoop.fs.azurebfs.constants.ReadType.RANDOM_READ;
 import static org.apache.hadoop.fs.azurebfs.constants.ReadType.SMALLFILE_READ;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
@@ -92,8 +98,7 @@
 /**
  * Unit test AbfsInputStream.
  */
-public class TestAbfsInputStream extends
-    AbstractAbfsIntegrationTest {
+public class TestAbfsInputStream extends AbstractAbfsIntegrationTest {
 
   private static final int ONE_KB = 1 * 1024;
   private static final int TWO_KB = 2 * 1024;
@@ -148,7 +153,7 @@ AbfsInputStream getAbfsInputStream(AbfsClient 
mockAbfsClient,
       String fileName) throws IOException {
     AbfsInputStreamContext inputStreamContext = new AbfsInputStreamContext(-1);
     // Create AbfsInputStream with the client instance
-    AbfsInputStream inputStream = new AbfsInputStream(
+    AbfsInputStream inputStream = new AbfsAdaptiveInputStream(
         mockAbfsClient,
         null,
         FORWARD_SLASH + fileName,
@@ -176,7 +181,7 @@ public AbfsInputStream getAbfsInputStream(AbfsClient 
abfsClient,
       int readAheadBlockSize) throws IOException {
     AbfsInputStreamContext inputStreamContext = new AbfsInputStreamContext(-1);
     // Create AbfsInputStream with the client instance
-    AbfsInputStream inputStream = new AbfsInputStream(
+    AbfsInputStream inputStream = new AbfsAdaptiveInputStream(
         abfsClient,
         null,
         FORWARD_SLASH + fileName,
@@ -848,6 +853,7 @@ public void testReadTypeInTracingContextHeader() throws 
Exception {
     fileSize = 3 * ONE_MB; // To make sure multiple blocks are read with MR
     totalReadCalls += 3; // 3 block of 1MB.
     Mockito.doReturn(0).when(spiedConfig).getReadAheadQueueDepth();
+    
Mockito.doReturn(FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL).when(spiedConfig).getAbfsReadPolicy();
     doReturn(true).when(spiedConfig).isReadAheadEnabled();
     testReadTypeInTracingContextHeaderInternal(spiedFs, fileSize, 
MISSEDCACHE_READ, 3, totalReadCalls);
 
@@ -881,6 +887,15 @@ public void testReadTypeInTracingContextHeader() throws 
Exception {
     doReturn(false).when(spiedConfig).optimizeFooterRead();
     testReadTypeInTracingContextHeaderInternal(spiedFs, fileSize, 
SMALLFILE_READ, 1, totalReadCalls);
 
+    /*
+     * Test to verify Random Read Type.
+     * Setting Read Policy to Parquet ensures Random Read Type.
+     */
+    fileSize = 3 * ONE_MB; // To make sure multiple blocks are read.
+    totalReadCalls += 3; // Full file will be read along with footer.
+    
doReturn(FS_OPTION_OPENFILE_READ_POLICY_PARQUET).when(spiedConfig).getAbfsReadPolicy();
+    testReadTypeInTracingContextHeaderInternal(spiedFs, fileSize, RANDOM_READ, 
1, totalReadCalls);
+
     /*
      * Test to verify Direct Read Type and a read from random position.
      * Separate AbfsInputStream method needs to be called.
@@ -904,11 +919,11 @@ public void testReadTypeInTracingContextHeader() throws 
Exception {
   private void testReadTypeInTracingContextHeaderInternal(AzureBlobFileSystem 
fs,
       int fileSize, ReadType readType, int numOfReadCalls, int totalReadCalls) 
throws Exception {
     Path testPath = createTestFile(fs, fileSize);
-    readFile(fs, testPath, fileSize);
+    readFile(fs, testPath, fileSize, readType);
     assertReadTypeInClientRequestId(fs, numOfReadCalls, totalReadCalls, 
readType);
   }
 
-  /*
+  /**
    * Test to verify that both conditions of prefetch read and respective config
    * enabled needs to be true for the priority header to be added
    */
@@ -937,6 +952,101 @@ public void 
testPrefetchReadAddsPriorityHeaderWithDifferentConfigs()
     executePrefetchReadTest(tracingContext1, configuration1, false);
   }
 
+  /**
+   * Test to verify that the correct AbfsInputStream instance is created
+   * based on the read policy set in AbfsConfiguration.
+   */
+  @Test
+  public void testAbfsInputStreamInstance() throws Exception {
+    AzureBlobFileSystem fs = getFileSystem();
+    Path path = new Path("/testPath");
+    fs.create(path).close();
+
+    // Assert that Sequential Read Policy uses Prefetch Input Stream
+    
getAbfsStore(fs).getAbfsConfiguration().setAbfsReadPolicy(FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL);
+    InputStream stream = fs.open(path).getWrappedStream();
+    assertThat(stream).isInstanceOf(AbfsPrefetchInputStream.class);
+    stream.close();
+
+    // Assert that Adaptive Read Policy uses Adaptive Input Stream
+    
getAbfsStore(fs).getAbfsConfiguration().setAbfsReadPolicy(FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE);
+    stream = fs.open(path).getWrappedStream();
+    assertThat(stream).isInstanceOf(AbfsAdaptiveInputStream.class);
+    stream.close();
+
+    // Assert that Parquet Read Policy uses Random Input Stream
+    
getAbfsStore(fs).getAbfsConfiguration().setAbfsReadPolicy(FS_OPTION_OPENFILE_READ_POLICY_PARQUET);
+    stream = fs.open(path).getWrappedStream();
+    assertThat(stream).isInstanceOf(AbfsRandomInputStream.class);
+    stream.close();
+
+    // Assert that Avro Read Policy uses Adaptive Input Stream
+    
getAbfsStore(fs).getAbfsConfiguration().setAbfsReadPolicy(FS_OPTION_OPENFILE_READ_POLICY_AVRO);
+    stream = fs.open(path).getWrappedStream();
+    assertThat(stream).isInstanceOf(AbfsAdaptiveInputStream.class);
+    stream.close();
+  }
+
+  /**
+   * Test to verify that Random Input Stream does not queue prefetches.
+   * @throws Exception if any error occurs during the test
+   */
+  @Test
+  public void testRandomInputStreamDoesNotQueuePrefetches() throws Exception {
+    AzureBlobFileSystem spiedFs = Mockito.spy(getFileSystem());
+    AzureBlobFileSystemStore spiedStore = Mockito.spy(spiedFs.getAbfsStore());
+    AbfsConfiguration spiedConfig = 
Mockito.spy(spiedStore.getAbfsConfiguration());
+    AbfsClient spiedClient = Mockito.spy(spiedStore.getClient());
+    Mockito.doReturn(ONE_MB).when(spiedConfig).getReadBufferSize();
+    Mockito.doReturn(ONE_MB).when(spiedConfig).getReadAheadBlockSize();
+    Mockito.doReturn(spiedClient).when(spiedStore).getClient();
+    Mockito.doReturn(spiedStore).when(spiedFs).getAbfsStore();
+    Mockito.doReturn(spiedConfig).when(spiedStore).getAbfsConfiguration();
+
+    int fileSize = 3 * ONE_MB; // To make sure multiple blocks are read.
+    int totalReadCalls = 3;
+    Mockito.doReturn(3).when(spiedConfig).getReadAheadQueueDepth();
+    
Mockito.doReturn(FS_OPTION_OPENFILE_READ_POLICY_PARQUET).when(spiedConfig).getAbfsReadPolicy();
+    testReadTypeInTracingContextHeaderInternal(spiedFs, fileSize, RANDOM_READ, 
3, totalReadCalls);
+  }
+
+  /**
+   * Test to verify that Adaptive Input Stream queues prefetches for in-order 
reads
+   * and performs random reads for out-of-order seeks.
+   * @throws Exception if any error occurs during the test
+   */
+  @Test
+  public void testAdaptiveInputStream() throws Exception {
+    AzureBlobFileSystem spiedFs = Mockito.spy(getFileSystem());
+    AzureBlobFileSystemStore spiedStore = Mockito.spy(spiedFs.getAbfsStore());
+    AbfsConfiguration spiedConfig = 
Mockito.spy(spiedStore.getAbfsConfiguration());
+    AbfsClient spiedClient = Mockito.spy(spiedStore.getClient());
+    Mockito.doReturn(ONE_MB).when(spiedConfig).getReadBufferSize();
+    Mockito.doReturn(ONE_MB).when(spiedConfig).getReadAheadBlockSize();
+    Mockito.doReturn(ONE_KB).when(spiedConfig).getReadAheadRange();
+    
Mockito.doReturn(FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE).when(spiedConfig).getAbfsReadPolicy();
+    Mockito.doReturn(spiedClient).when(spiedStore).getClient();
+    Mockito.doReturn(spiedStore).when(spiedFs).getAbfsStore();
+    Mockito.doReturn(spiedConfig).when(spiedStore).getAbfsConfiguration();
+
+    int fileSize = 10 * ONE_MB;
+    Path testPath = createTestFile(spiedFs, fileSize);
+
+    try (FSDataInputStream iStream = spiedFs.open(testPath)) {
+      
assertThat(iStream.getWrappedStream()).isInstanceOf(AbfsAdaptiveInputStream.class);
+
+      // In order reads trigger prefetches in adaptive stream
+      int bytesRead = iStream.read(new byte[2 * ONE_MB], 0, 2 * ONE_MB);
+      assertReadTypeInClientRequestId(spiedFs, 3, 3, PREFETCH_READ);
+      assertThat(bytesRead).isEqualTo(2 * ONE_MB);
+
+      // Out of order seek causes random read
+      iStream.seek(7 * ONE_MB);
+      bytesRead = iStream.read(new byte[ONE_MB/2], 0, ONE_MB/2);
+      assertReadTypeInClientRequestId(spiedFs, 1, 4, RANDOM_READ);
+    }
+  }
+
   /*
    * Helper method to execute read and verify if priority header is added or 
not as expected
    */
@@ -1005,8 +1115,15 @@ private Path createTestFile(AzureBlobFileSystem fs, int 
fileSize) throws Excepti
     return testPath;
   }
 
-  private void readFile(AzureBlobFileSystem fs, Path testPath, int fileSize) 
throws Exception {
+  private void readFile(AzureBlobFileSystem fs, Path testPath, int fileSize, 
ReadType readType) throws Exception {
     try (FSDataInputStream iStream = fs.open(testPath)) {
+      if (readType == PREFETCH_READ || readType == MISSEDCACHE_READ) {
+        
assertThat(iStream.getWrappedStream()).isInstanceOf(AbfsPrefetchInputStream.class);
+      } else if (readType == NORMAL_READ) {
+        
assertThat(iStream.getWrappedStream()).isInstanceOf(AbfsAdaptiveInputStream.class);
+      } else if (readType == RANDOM_READ) {
+        
assertThat(iStream.getWrappedStream()).isInstanceOf(AbfsRandomInputStream.class);
+      }
       int bytesRead = iStream.read(new byte[fileSize], 0,
           fileSize);
       assertThat(fileSize)
@@ -1027,6 +1144,7 @@ private void 
assertReadTypeInClientRequestId(AzureBlobFileSystem fs, int numOfRe
     ArgumentCaptor<ContextEncryptionAdapter> captor8 = 
ArgumentCaptor.forClass(ContextEncryptionAdapter.class);
     ArgumentCaptor<TracingContext> captor9 = 
ArgumentCaptor.forClass(TracingContext.class);
 
+    List<String> paths = captor1.getAllValues();
     verify(fs.getAbfsStore().getClient(), times(totalReadCalls)).read(
         captor1.capture(), captor2.capture(), captor3.capture(),
         captor4.capture(), captor5.capture(), captor6.capture(),
@@ -1071,8 +1189,14 @@ private void 
verifyHeaderForReadTypeInTracingContextHeader(TracingContext tracin
     }
     assertThat(idList[OPERATION_INDEX]).describedAs("Operation Type Should Be 
Read")
         .isEqualTo(FSOperationType.READ.toString());
-    assertThat(idList[READTYPE_INDEX]).describedAs("Read type in tracing 
context header should match")
-        .isEqualTo(readType.toString());
+    if (readType == PREFETCH_READ) {
+      // For prefetch read, it might be missed cache as well.
+      assertThat(idList[READTYPE_INDEX]).describedAs("Read type in tracing 
context header should match")
+          .isIn(PREFETCH_READ.toString(), MISSEDCACHE_READ.toString());
+    } else {
+      assertThat(idList[READTYPE_INDEX]).describedAs("Read type in tracing 
context header should match")
+              .isEqualTo(readType.toString());
+    }
   }
 
 
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestReadBufferManagerV2.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestReadBufferManagerV2.java
index e94c535bd39..a7cbc83f0c7 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestReadBufferManagerV2.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestReadBufferManagerV2.java
@@ -268,6 +268,7 @@ public void testMemoryDownscaleIfMemoryAboveThreshold() 
throws Exception {
     ReadBufferManagerV2 bufferManagerV2 = 
ReadBufferManagerV2.getBufferManager(abfsClient.getAbfsCounters());
     int initialBuffers = bufferManagerV2.getMinBufferPoolSize();
     assertThat(bufferManagerV2.getNumBuffers()).isEqualTo(initialBuffers);
+    bufferManagerV2.setMinBufferPoolSize(initialBuffers - 5); // allow 
downscale
     running = true;
     Thread t = new Thread(() -> {
       while (running) {
@@ -314,6 +315,7 @@ public void testReadMetricUpdation() throws Exception {
           
bufferManagerV2.getCurrentStats(ResourceUtilizationUtils.getJvmCpuLoad());
       int initialBuffers = bufferManagerV2.getMinBufferPoolSize();
       assertThat(bufferManagerV2.getNumBuffers()).isEqualTo(initialBuffers);
+      bufferManagerV2.setMinBufferPoolSize(initialBuffers - 5); // allow 
downscale
       running = true;
       Thread t = new Thread(() -> {
         while (running) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to